Commit c0395e67 authored by Simon Pastor's avatar Simon Pastor
Browse files

bws tier 2-3-4

parent b4e5701a
Loading
Loading
Loading
Loading
+15 −7
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@ import (
	"fmt"
	"strconv"
	"strings"
	"sync"
	"time"

	bws "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-bw-sharing"
@@ -205,6 +206,7 @@ var nextTransactionId = 1
const redisAddr string = "meep-redis-master:6379"

var rc *redis.Connector
var mutex sync.Mutex

const DEFAULT_TC_ENGINE_DB = 0

@@ -231,14 +233,14 @@ func Init() (err error) {
	rc.DBFlush(moduleTcEngine)
	rc.DBFlush(moduleMetrics)

	bwSharing, err = bws.NewBwSharing("default", updateOneFilterRule, applyOneFilterRule)
	bwSharing, err = bws.NewBwSharing("default", redisAddr, updateOneFilterRule, applyOneFilterRule)

	if err != nil {
		log.Error("Failed to create a bwSharing object. Error: ", err)
		return err
	}
	bwSharing.UpdateControls()
	//	_ = bwSharing.Start()
	_ = bwSharing.Start()

	// Initialize TC Engine with current active scenario & LB rules
	processActiveScenarioUpdate()
@@ -291,7 +293,6 @@ func processActiveScenarioUpdate() {
	}

	// Parse scenario
	go bwSharing.ParseScenarioUpdate(scenario)
	parseScenario(scenario)

	switch tcEngineState {
@@ -1034,7 +1035,7 @@ func updateOneFilterRule(dstName string, srcName string, rate float64) {

					filterInfo.DataRate = int(THROUGHPUT_UNIT * rate)

					_ = updateNetCharRule(&filterInfo)
					_ = updateNetCharRule(&filterInfo, true)
					break
				}
			}
@@ -1089,6 +1090,7 @@ func applyNetCharRules() {
			filterInfo.LatencyCorrelation = COMMON_CORRELATION
			value = netCharTable[i][j][PACKET_LOSS]
			filterInfo.PacketLoss = value
			//throughput is always updated to make sure a value will be set in the DB is bwSharing is not active at the time of setting the value in the DB
			value = netCharTable[i][j][THROUGHPUT]
			filterInfo.DataRate = value
			needUpdateFilter := false
@@ -1155,7 +1157,7 @@ func applyNetCharRules() {
					_ = updateFilterRule(&filterInfo)
				} else {
					if needUpdateNetChar {
						_ = updateNetCharRule(&filterInfo)
						_ = updateNetCharRule(&filterInfo, !bwSharing.IsRunning())
					}
				}
			}
@@ -1200,7 +1202,9 @@ func updateFilterRule(filterInfo *FilterInfo) error {
	m_shape["ifb_uniqueId"] = ifbNumberStr

	keyName = moduleTcEngine + ":" + typeNet + ":" + filterInfo.PodName + ":shape:" + ifbNumberStr
	mutex.Lock()
	err = rc.SetEntry(keyName, m_shape)
	mutex.Unlock()
	if err != nil {
		return err
	}
@@ -1227,7 +1231,7 @@ func updateFilterRule(filterInfo *FilterInfo) error {
	return nil
}

func updateNetCharRule(filterInfo *FilterInfo) error {
func updateNetCharRule(filterInfo *FilterInfo, updateDataRate bool) error {
	var err error
	var keyName string

@@ -1244,11 +1248,15 @@ func updateNetCharRule(filterInfo *FilterInfo) error {
	m_shape["delayVariation"] = strconv.FormatInt(int64(filterInfo.LatencyVariation), 10)
	m_shape["delayCorrelation"] = strconv.FormatInt(int64(filterInfo.LatencyCorrelation), 10)
	m_shape["packetLoss"] = strconv.FormatInt(int64(filterInfo.PacketLoss), 10)
	if updateDataRate {
		m_shape["dataRate"] = strconv.FormatInt(int64(filterInfo.DataRate), 10)
	}
	m_shape["ifb_uniqueId"] = ifbNumberStr

	keyName = moduleTcEngine + ":" + typeNet + ":" + filterInfo.PodName + ":shape:" + ifbNumberStr
	mutex.Lock()
	err = rc.SetEntry(keyName, m_shape)
	mutex.Unlock()
	if err != nil {
		return err
	}
+4 −1
Original line number Diff line number Diff line
@@ -751,7 +751,10 @@ func cmdSetIfb(shape map[string]string) error {
	if delayVariation != "0" {
		normalDistributionStr = "distribution normal"
	}
	str := "tc qdisc change dev ifb" + ifbNumber + " handle 1:0 root netem delay " + delay + "ms " + delayVariation + "ms " + delayCorrelation + "% " + normalDistributionStr + " loss " + lossInteger + "." + lossFraction + "% rate " + dataRate + "bit"
	str := "tc qdisc change dev ifb" + ifbNumber + " handle 1:0 root netem delay " + delay + "ms " + delayVariation + "ms " + delayCorrelation + "% " + normalDistributionStr + " loss " + lossInteger + "." + lossFraction + "%"
	if dataRate != "0" {
		str = str + " rate " + dataRate + "bit"
	}
	_, err := cmdExec(str)
	if err != nil {
		return err
+64 −26
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@
package bws

import (
	"encoding/json"
	"errors"
	"strconv"
	"sync"
@@ -27,10 +28,19 @@ import (
	redis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis"
)

const redisAddr string = "meep-redis-master:6379"

var BW_SHARING_CONTROLS_DB = 0

// BwAlgorithm
type BwSharingAlgorithm interface {
	initDefaultConfigAttributes()
	parseScenario(ceModel.Scenario)
	updateDefaultConfigAttributes(string, string)
	tickerFunction()
	deallocateBandwidthSharing()
	allocateBandwidthSharing()
	setParentBwSharing(*BwSharing)
}

// BwSharing -
type BwSharing struct {
	name           string
@@ -42,6 +52,7 @@ type BwSharing struct {
	updateFilterCB func(string, string, float64)
	applyFilterCB  func()
	config         ConfigurationAttributes
	bwAlgo         BwSharingAlgorithm
}

// ConfigurationAttributes -
@@ -49,20 +60,18 @@ type ConfigurationAttributes struct {
	Action              string
	RecalculationPeriod int
	LogVerbose          bool
	EnableTier1         bool
	EnableTier2         bool
	EnableTier3         bool
}

// NewBwSharing - Create, Initialize and connect
func NewBwSharing(name string, updateFilterRule func(string, string, float64), applyFilterRule func()) (bw *BwSharing, err error) {
func NewBwSharing(name string, redisAddr string, updateFilterRule func(string, string, float64), applyFilterRule func()) (*BwSharing, error) {
	var err error
	if name == "" {
		err = errors.New("Missing bwSharing name")
		log.Error(err)
		return nil, err
	}

	bw = new(BwSharing)
	//	bw = new(BwSharing)
	var bw BwSharing
	bw.name = name
	bw.isStarted = false
	bw.isReady = false
@@ -75,9 +84,16 @@ func NewBwSharing(name string, updateFilterRule func(string, string, float64), a
	}
	log.Info("Connected to redis DB")

	//bw.bwAlgo = new(DefaultBwSharingAlgorithm)
	//var algo BwSharingAlgorithm
	//algo = &dd
	algo := []BwSharingAlgorithm{&DefaultBwSharingAlgorithm{}}
	//algo = &DefaultBwSharingAlgorithm{}
	bw.bwAlgo = algo[0] //algoBwSharingAlgorithm
	//&DefaultBwSharingAlgorithm{}
	// Subscribe to Pub-Sub events for MEEP Controller
	// NOTE: Current implementation is RedisDB Pub-Sub
	err = bw.rcCtrlEng.Subscribe(channelBwSharingControls)
	err = bw.rcCtrlEng.Subscribe(channelBwSharingControls, channelCtrlActive)
	if err != nil {
		log.Error("Failed to subscribe to Pub/Sub events on channelBwSharingControls. Error: ", err)
		return nil, err
@@ -89,7 +105,8 @@ func NewBwSharing(name string, updateFilterRule func(string, string, float64), a
	bw.applyFilterCB = applyFilterRule
	//get values from the DB, or defaults
	bw.InitDefaultConfigAttributes()
	return bw, nil
	bw.bwAlgo.setParentBwSharing(&bw)
	return &bw, nil
}

// InitDefaultConfigAttributes - Initialize some default variables used by the generic bws object
@@ -97,7 +114,7 @@ func (bw *BwSharing) InitDefaultConfigAttributes() {

	bw.config.RecalculationPeriod = defaultTickerPeriod
	//initialize the default config attributes specific to the algorithm choosen
	initDefaultConfigAttributes()
	bw.bwAlgo.initDefaultConfigAttributes()
}

// Run - Listening event
@@ -114,23 +131,46 @@ func (bw *BwSharing) eventHandler(channel string, payload string) {
	case channelBwSharingControls:
		log.Debug("Event received on channel: ", channelBwSharingControls)
		bw.UpdateControls()
	case channelCtrlActive:
		log.Debug("Event received on channel: ", channelCtrlActive)
		bw.ProcessActiveScenarioUpdate()
	default:
		log.Warn("Unsupported channel")
	}
}

// ParseScenarioUpdate - Parse the scenario and extract the information usefull to the bws algorithms by calling their specific implementations
func (bw *BwSharing) ParseScenarioUpdate(scenario ceModel.Scenario) {
	if bw.isStarted {
// ProcessActiveScenarioUpdate
func (bw *BwSharing) ProcessActiveScenarioUpdate() {
	// Retrieve active scenario from DB
	jsonScenario, err := bw.rcCtrlEng.JSONGetEntry(moduleCtrlEngine+":"+typeActive, ".")
	if err != nil {
		log.Error(err.Error())
		bw.StopScenario()
		return
	}
	// Unmarshal Active scenario
	var scenario ceModel.Scenario
	err = json.Unmarshal([]byte(jsonScenario), &scenario)
	if err != nil {
		log.Error(err.Error())
		bw.StopScenario()
		return
	}

	// Parse scenario
	if bw.isStarted {
		bw.mutex.Lock()
		bw.isReady = false
		parseScenario(scenario)
		bw.bwAlgo.parseScenario(scenario)
		bw.isReady = true
		bw.mutex.Unlock()
	}
}

// StopScenario
func (bw *BwSharing) StopScenario() {
}

// updateFilter - Updates the filters in the DB that will be pushed to the sidecars
func (bw *BwSharing) updateFilter(dst string, src string, value float64) {
	bw.updateFilterCB(dst, src, value)
@@ -160,9 +200,6 @@ func (bw *BwSharing) getControlsEntryHandler(key string, fields map[string]strin
	actionName := ""
	tickerPeriod := defaultTickerPeriod
	logVerbose := false
	enableTier1 := false
	enableTier2 := false
	enableTier3 := false

	for fieldName, fieldValue := range fields {
		switch fieldName {
@@ -178,16 +215,13 @@ func (bw *BwSharing) getControlsEntryHandler(key string, fields map[string]strin
				logVerbose = true
			}
		default:
			updateDefaultConfigAttributes(fieldName, fieldValue)
		}
		bw.bwAlgo.updateDefaultConfigAttributes(fieldName, fieldValue)
	}

	bw.config.Action = actionName
	bw.config.RecalculationPeriod = tickerPeriod
	bw.config.LogVerbose = logVerbose
	bw.config.EnableTier1 = enableTier1
	bw.config.EnableTier2 = enableTier2
	bw.config.EnableTier3 = enableTier3

	//for debug
	bw.ApplyAction()
@@ -212,14 +246,18 @@ func (bw *BwSharing) ApplyAction() (err error) {
	return nil
}

// IsRunning()
func (bw *BwSharing) IsRunning() bool {
	return bw.isStarted
}

// Start - starts bwSharing distribution calculations
func (bw *BwSharing) Start() (err error) {
	bw.isStarted = true
	bw.isReady = true
	bw.ticker = time.NewTicker(time.Duration(bw.config.RecalculationPeriod) * time.Millisecond)

	allocateBandwidthSharing()
	//bw.ParseScenarioUpdate()
	bw.bwAlgo.allocateBandwidthSharing()
	go func() {
		for range bw.ticker.C {

@@ -227,7 +265,7 @@ func (bw *BwSharing) Start() (err error) {
			if bw.isReady {
				bw.mutex.Lock()
				bw.isReady = false
				tickerFunction(bw.rcCtrlEng, bw.config.LogVerbose, bw.updateFilterCB, bw.applyFilterCB)
				bw.bwAlgo.tickerFunction()
				bw.isReady = true
				bw.mutex.Unlock()
			}
@@ -244,6 +282,6 @@ func (bw *BwSharing) Stop() {
		log.Debug("BwSharing computation stopped ", bw.name)
		bw.isStarted = false
		bw.isReady = false
		cleanUp()
		bw.bwAlgo.deallocateBandwidthSharing()
	}
}
+43 −0
Original line number Diff line number Diff line
@@ -17,42 +17,27 @@
package bws

import (
	"fmt"
	"testing"
	"time"

        redis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis"
	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
        ceModel "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-ctrl-engine-model"

)

const redisAddr string = "localhost:30379"

//default allocations
func allocateBandwidthSharing() {
	//default allocations
}

//updates attributes through DB
func updateDefaultConfigAttributes(fieldName string, fieldValue string) {

        switch(fieldName) {
	default:
	}
}

//periodic function invoked
func tickerFunction(rcCtrlEng *redis.Connector, logVerbose bool, updateFilter func(string, string, float64), applyFilter func()) {

}
func TestBwsharingBasic(t *testing.T) {
	fmt.Println("--- ", t.Name())
	log.MeepTextLogInit(t.Name())

//reseting previous allocations
func cleanUp() {
}
	bwSharing, err := NewBwSharing("test", redisAddr, nil, nil)
	if err != nil {
		t.Errorf("Failed to create a bwSharing object.")
	} else {
		bwSharing.UpdateControls()
		_ = bwSharing.Start()

//default init values if not configured
func initDefaultConfigAttributes() {
		time.Sleep(1000 * time.Millisecond)
		bwSharing.Stop()
	}

//scenario parsing function
func parseScenario(scenario ceModel.Scenario) {
        log.Debug("parseScenario")
}
+271 −168

File changed.

Preview size limit exceeded, changes collapsed.

Loading