Unverified Commit 22f4002c authored by Mike Roy's avatar Mike Roy Committed by GitHub
Browse files

Merge pull request #29 from pastorsx/sp_dev_sp30_bws

Bandwidth Sharing Feature Complete
parents b4e5701a c35f2555
Loading
Loading
Loading
Loading
+22 −17
Original line number Diff line number Diff line
@@ -21,6 +21,7 @@ import (
	"fmt"
	"strconv"
	"strings"
	"sync"
	"time"

	bws "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-bw-sharing"
@@ -37,7 +38,6 @@ import (
const moduleTcEngine string = "tc-engine"
const moduleCtrlEngine string = "ctrl-engine"
const moduleMgManager string = "mg-manager"
const moduleMetrics string = "metrics"

const typeActive string = "active"
const typeNet string = "net"
@@ -205,6 +205,7 @@ var nextTransactionId = 1
const redisAddr string = "meep-redis-master:6379"

var rc *redis.Connector
var mutex sync.Mutex

const DEFAULT_TC_ENGINE_DB = 0

@@ -229,16 +230,15 @@ func Init() (err error) {

	// Flush any remaining TC Engine rules
	rc.DBFlush(moduleTcEngine)
	rc.DBFlush(moduleMetrics)

	bwSharing, err = bws.NewBwSharing("default", updateOneFilterRule, applyOneFilterRule)
	bwSharing, err = bws.NewBwSharing("default", redisAddr, updateOneFilterRule, applyOneFilterRule)

	if err != nil {
		log.Error("Failed to create a bwSharing object. Error: ", err)
		return err
	}
	bwSharing.UpdateControls()
	//	_ = bwSharing.Start()
	_ = bwSharing.Start()

	// Initialize TC Engine with current active scenario & LB rules
	processActiveScenarioUpdate()
@@ -291,7 +291,6 @@ func processActiveScenarioUpdate() {
	}

	// Parse scenario
	go bwSharing.ParseScenarioUpdate(scenario)
	parseScenario(scenario)

	switch tcEngineState {
@@ -304,6 +303,7 @@ func processActiveScenarioUpdate() {

	case stateReady:
		// Update Network Characteristic matrix table
		mutex.Lock()
		refreshNetCharTable()

		//debug for the tables
@@ -314,6 +314,7 @@ func processActiveScenarioUpdate() {

		// Apply network characteristic rules
		applyNetCharRules()
		mutex.Unlock()

		//Update the Db for state information (only transactionId for now)
		updateDbState(nextTransactionId)
@@ -451,7 +452,6 @@ func stopScenario() {
	scenarioName = ""

	rc.DBFlush(moduleTcEngine)
	rc.DBFlush(moduleMetrics)

	_ = rc.Publish(channelTcNet, "delAll")
	_ = rc.Publish(channelTcLb, "delAll")
@@ -1015,6 +1015,7 @@ func updateDbState(transactionId int) {
func updateOneFilterRule(dstName string, srcName string, rate float64) {
	var filterInfo FilterInfo

	mutex.Lock()
	for _, dstElement := range indexToNetElemMap {
		if dstElement.Name == dstName {
			for _, storedFilterInfo := range dstElement.FilterInfoList {
@@ -1034,12 +1035,13 @@ func updateOneFilterRule(dstName string, srcName string, rate float64) {

					filterInfo.DataRate = int(THROUGHPUT_UNIT * rate)

					_ = updateNetCharRule(&filterInfo)
					_ = updateNetCharRule(&filterInfo, true)
					break
				}
			}
		}
	}
	mutex.Unlock()
}

func applyOneFilterRule() {
@@ -1089,6 +1091,7 @@ func applyNetCharRules() {
			filterInfo.LatencyCorrelation = COMMON_CORRELATION
			value = netCharTable[i][j][PACKET_LOSS]
			filterInfo.PacketLoss = value
			//throughput is always updated to make sure a value will be set in the DB is bwSharing is not active at the time of setting the value in the DB
			value = netCharTable[i][j][THROUGHPUT]
			filterInfo.DataRate = value
			needUpdateFilter := false
@@ -1149,14 +1152,12 @@ func applyNetCharRules() {
			if needCreate {
				//follows +2 convention since one odd and even number reserved for the same rule (applied and updated one)
				dstElementPtr.NextUniqueNumber += 2
				_ = updateFilterRule(&filterInfo)
				_ = updateFilterRule(&filterInfo, !bwSharing.IsRunning())
			} else {
				if needUpdateFilter {
					_ = updateFilterRule(&filterInfo)
				} else {
					if needUpdateNetChar {
						_ = updateNetCharRule(&filterInfo)
					}
					_ = updateFilterRule(&filterInfo, !bwSharing.IsRunning())
				} else if needUpdateNetChar {
					_ = updateNetCharRule(&filterInfo, !bwSharing.IsRunning())
				}
			}
			indexToNetElemMap[j] = *dstElementPtr
@@ -1179,7 +1180,7 @@ func deleteFilterRule(filterInfo *FilterInfo) error {
	return nil
}

func updateFilterRule(filterInfo *FilterInfo) error {
func updateFilterRule(filterInfo *FilterInfo, updateDataRate bool) error {
	var err error
	var keyName string

@@ -1196,7 +1197,9 @@ func updateFilterRule(filterInfo *FilterInfo) error {
	m_shape["delayVariation"] = strconv.FormatInt(int64(filterInfo.LatencyVariation), 10)
	m_shape["delayCorrelation"] = strconv.FormatInt(int64(filterInfo.LatencyCorrelation), 10)
	m_shape["packetLoss"] = strconv.FormatInt(int64(filterInfo.PacketLoss), 10)
	if updateDataRate {
		m_shape["dataRate"] = strconv.FormatInt(int64(filterInfo.DataRate), 10)
	}
	m_shape["ifb_uniqueId"] = ifbNumberStr

	keyName = moduleTcEngine + ":" + typeNet + ":" + filterInfo.PodName + ":shape:" + ifbNumberStr
@@ -1227,7 +1230,7 @@ func updateFilterRule(filterInfo *FilterInfo) error {
	return nil
}

func updateNetCharRule(filterInfo *FilterInfo) error {
func updateNetCharRule(filterInfo *FilterInfo, updateDataRate bool) error {
	var err error
	var keyName string

@@ -1244,7 +1247,9 @@ func updateNetCharRule(filterInfo *FilterInfo) error {
	m_shape["delayVariation"] = strconv.FormatInt(int64(filterInfo.LatencyVariation), 10)
	m_shape["delayCorrelation"] = strconv.FormatInt(int64(filterInfo.LatencyCorrelation), 10)
	m_shape["packetLoss"] = strconv.FormatInt(int64(filterInfo.PacketLoss), 10)
	if updateDataRate {
		m_shape["dataRate"] = strconv.FormatInt(int64(filterInfo.DataRate), 10)
	}
	m_shape["ifb_uniqueId"] = ifbNumberStr

	keyName = moduleTcEngine + ":" + typeNet + ":" + filterInfo.PodName + ":shape:" + ifbNumberStr
+9 −4
Original line number Diff line number Diff line
@@ -228,10 +228,15 @@ func (u *destination) processRxTx(rc *redis.Connector) {
	var throughputStats = make(map[string]interface{})
	throughputStats[u.remoteName] = throughputVal

	//store as an individual dataset but also as an aggregate for throughput only (for now)
	_ = rc.SetEntry(moduleMetrics+":"+PodName+":"+u.remoteName, stats)
	//throughput stats will be appended if the entry didn't exist or replaced if it does
	//store statistics but only if the entry exists
	key := moduleMetrics + ":" + PodName + ":" + u.remoteName
	if rc.EntryExists(key) {
		_ = rc.SetEntry(key, stats)
	}
	key = moduleMetrics + ":" + PodName + ":throughput"
	if rc.EntryExists(key) {
		_ = rc.SetEntry(moduleMetrics+":"+PodName+":throughput", throughputStats)
	}

	//pacing the logs in ES
	elasticLogPacing++
+4 −1
Original line number Diff line number Diff line
@@ -751,7 +751,10 @@ func cmdSetIfb(shape map[string]string) error {
	if delayVariation != "0" {
		normalDistributionStr = "distribution normal"
	}
	str := "tc qdisc change dev ifb" + ifbNumber + " handle 1:0 root netem delay " + delay + "ms " + delayVariation + "ms " + delayCorrelation + "% " + normalDistributionStr + " loss " + lossInteger + "." + lossFraction + "% rate " + dataRate + "bit"
	str := "tc qdisc change dev ifb" + ifbNumber + " handle 1:0 root netem delay " + delay + "ms " + delayVariation + "ms " + delayCorrelation + "% " + normalDistributionStr + " loss " + lossInteger + "." + lossFraction + "%"
	if dataRate != "0" {
		str = str + " rate " + dataRate + "bit"
	}
	_, err := cmdExec(str)
	if err != nil {
		return err
+69 −45
Original line number Diff line number Diff line
@@ -17,6 +17,7 @@
package bws

import (
	"encoding/json"
	"errors"
	"strconv"
	"sync"
@@ -27,10 +28,19 @@ import (
	redis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis"
)

const redisAddr string = "meep-redis-master:6379"

var BW_SHARING_CONTROLS_DB = 0

// BwAlgorithm
type BwSharingAlgorithm interface {
	init(*redis.Connector, func(string, string, float64), func())
	initDefaultConfigAttributes()
	parseScenario(ceModel.Scenario)
	updateDefaultConfigAttributes(string, string)
	tickerFunction()
	deallocateBandwidthSharing()
	allocateBandwidthSharing()
}

// BwSharing -
type BwSharing struct {
	name      string
@@ -39,9 +49,8 @@ type BwSharing struct {
	ticker    *time.Ticker
	rcCtrlEng *redis.Connector
	mutex     sync.Mutex
	updateFilterCB func(string, string, float64)
	applyFilterCB  func()
	config ConfigurationAttributes
	bwAlgo BwSharingAlgorithm
}

// ConfigurationAttributes -
@@ -49,20 +58,17 @@ type ConfigurationAttributes struct {
	Action              string
	RecalculationPeriod int
	LogVerbose          bool
	EnableTier1         bool
	EnableTier2         bool
	EnableTier3         bool
}

// NewBwSharing - Create, Initialize and connect
func NewBwSharing(name string, updateFilterRule func(string, string, float64), applyFilterRule func()) (bw *BwSharing, err error) {
func NewBwSharing(name string, redisAddr string, updateFilterRule func(string, string, float64), applyFilterRule func()) (*BwSharing, error) {
	var err error
	if name == "" {
		err = errors.New("Missing bwSharing name")
		log.Error(err)
		return nil, err
	}

	bw = new(BwSharing)
	var bw BwSharing
	bw.name = name
	bw.isStarted = false
	bw.isReady = false
@@ -75,21 +81,24 @@ func NewBwSharing(name string, updateFilterRule func(string, string, float64), a
	}
	log.Info("Connected to redis DB")

	bw.bwAlgo = new(DefaultBwSharingAlgorithm)
	// Subscribe to Pub-Sub events for MEEP Controller
	// NOTE: Current implementation is RedisDB Pub-Sub
	err = bw.rcCtrlEng.Subscribe(channelBwSharingControls)
	err = bw.rcCtrlEng.Subscribe(channelBwSharingControls, channelCtrlActive)
	if err != nil {
		log.Error("Failed to subscribe to Pub/Sub events on channelBwSharingControls. Error: ", err)
		return nil, err
	}

	//delete pre-existent metrics rule in the DB if any
	bw.rcCtrlEng.DBFlush(moduleMetrics)

	go bw.Run()

	bw.updateFilterCB = updateFilterRule
	bw.applyFilterCB = applyFilterRule
	//get values from the DB, or defaults
	bw.InitDefaultConfigAttributes()
	return bw, nil
	bw.bwAlgo.init(bw.rcCtrlEng, updateFilterRule, applyFilterRule)
	return &bw, nil
}

// InitDefaultConfigAttributes - Initialize some default variables used by the generic bws object
@@ -97,7 +106,7 @@ func (bw *BwSharing) InitDefaultConfigAttributes() {

	bw.config.RecalculationPeriod = defaultTickerPeriod
	//initialize the default config attributes specific to the algorithm choosen
	initDefaultConfigAttributes()
	bw.bwAlgo.initDefaultConfigAttributes()
}

// Run - Listening event
@@ -114,31 +123,48 @@ func (bw *BwSharing) eventHandler(channel string, payload string) {
	case channelBwSharingControls:
		log.Debug("Event received on channel: ", channelBwSharingControls)
		bw.UpdateControls()
	case channelCtrlActive:
		log.Debug("Event received on channel: ", channelCtrlActive)
		bw.ProcessActiveScenarioUpdate()
	default:
		log.Warn("Unsupported channel")
	}
}

// ParseScenarioUpdate - Parse the scenario and extract the information usefull to the bws algorithms by calling their specific implementations
func (bw *BwSharing) ParseScenarioUpdate(scenario ceModel.Scenario) {
	if bw.isStarted {
// ProcessActiveScenarioUpdate
func (bw *BwSharing) ProcessActiveScenarioUpdate() {
	// Retrieve active scenario from DB
	jsonScenario, err := bw.rcCtrlEng.JSONGetEntry(moduleCtrlEngine+":"+typeActive, ".")
	if err != nil {
		log.Error(err.Error())
		bw.StopScenario()
		//flush existing metrics entrics in the DB
		bw.rcCtrlEng.DBFlush(moduleMetrics)
		return
	}
	// Unmarshal Active scenario
	var scenario ceModel.Scenario
	err = json.Unmarshal([]byte(jsonScenario), &scenario)
	if err != nil {
		log.Error(err.Error())
		bw.StopScenario()
		return
	}

	// Parse scenario
	if bw.isStarted {
		bw.mutex.Lock()
		bw.isReady = false
		parseScenario(scenario)
		bw.bwAlgo.parseScenario(scenario)
		bw.isReady = true
		bw.mutex.Unlock()
	}
}

// updateFilter - Updates the filters in the DB that will be pushed to the sidecars
func (bw *BwSharing) updateFilter(dst string, src string, value float64) {
	bw.updateFilterCB(dst, src, value)
}

// applyFilter - Send notifications to apply the filters stored in the DB for the sidecars
func (bw *BwSharing) applyFilter() {
	bw.applyFilterCB()
// StopScenario
func (bw *BwSharing) StopScenario() {
	var emptyScenario ceModel.Scenario
	bw.bwAlgo.parseScenario(emptyScenario)
}

// UpdateControls - Update all the different configurations attributes based on the content of the DB for dynamic updates
@@ -160,9 +186,6 @@ func (bw *BwSharing) getControlsEntryHandler(key string, fields map[string]strin
	actionName := ""
	tickerPeriod := defaultTickerPeriod
	logVerbose := false
	enableTier1 := false
	enableTier2 := false
	enableTier3 := false

	for fieldName, fieldValue := range fields {
		switch fieldName {
@@ -178,16 +201,13 @@ func (bw *BwSharing) getControlsEntryHandler(key string, fields map[string]strin
				logVerbose = true
			}
		default:
			updateDefaultConfigAttributes(fieldName, fieldValue)
		}
		bw.bwAlgo.updateDefaultConfigAttributes(fieldName, fieldValue)
	}

	bw.config.Action = actionName
	bw.config.RecalculationPeriod = tickerPeriod
	bw.config.LogVerbose = logVerbose
	bw.config.EnableTier1 = enableTier1
	bw.config.EnableTier2 = enableTier2
	bw.config.EnableTier3 = enableTier3

	//for debug
	bw.ApplyAction()
@@ -212,14 +232,18 @@ func (bw *BwSharing) ApplyAction() (err error) {
	return nil
}

// IsRunning()
func (bw *BwSharing) IsRunning() bool {
	return bw.isStarted
}

// Start - starts bwSharing distribution calculations
func (bw *BwSharing) Start() (err error) {
	bw.isStarted = true
	bw.isReady = true
	bw.ticker = time.NewTicker(time.Duration(bw.config.RecalculationPeriod) * time.Millisecond)

	allocateBandwidthSharing()
	//bw.ParseScenarioUpdate()
	bw.bwAlgo.allocateBandwidthSharing()
	go func() {
		for range bw.ticker.C {

@@ -227,7 +251,7 @@ func (bw *BwSharing) Start() (err error) {
			if bw.isReady {
				bw.mutex.Lock()
				bw.isReady = false
				tickerFunction(bw.rcCtrlEng, bw.config.LogVerbose, bw.updateFilterCB, bw.applyFilterCB)
				bw.bwAlgo.tickerFunction()
				bw.isReady = true
				bw.mutex.Unlock()
			}
@@ -244,6 +268,6 @@ func (bw *BwSharing) Stop() {
		log.Debug("BwSharing computation stopped ", bw.name)
		bw.isStarted = false
		bw.isReady = false
		cleanUp()
		bw.bwAlgo.deallocateBandwidthSharing()
	}
}
+43 −0
Original line number Diff line number Diff line
@@ -17,42 +17,27 @@
package bws

import (
	"fmt"
	"testing"
	"time"

        redis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis"
	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
        ceModel "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-ctrl-engine-model"

)

const redisAddr string = "localhost:30379"

//default allocations
func allocateBandwidthSharing() {
	//default allocations
}

//updates attributes through DB
func updateDefaultConfigAttributes(fieldName string, fieldValue string) {

        switch(fieldName) {
	default:
	}
}

//periodic function invoked
func tickerFunction(rcCtrlEng *redis.Connector, logVerbose bool, updateFilter func(string, string, float64), applyFilter func()) {

}
func TestBwsharingBasic(t *testing.T) {
	fmt.Println("--- ", t.Name())
	log.MeepTextLogInit(t.Name())

//reseting previous allocations
func cleanUp() {
}
	bwSharing, err := NewBwSharing("test", redisAddr, nil, nil)
	if err != nil {
		t.Errorf("Failed to create a bwSharing object.")
	} else {
		bwSharing.UpdateControls()
		_ = bwSharing.Start()

//default init values if not configured
func initDefaultConfigAttributes() {
		time.Sleep(1000 * time.Millisecond)
		bwSharing.Stop()
	}

//scenario parsing function
func parseScenario(scenario ceModel.Scenario) {
        log.Debug("parseScenario")
}
Loading