Commit 31099b5b authored by Simon Pastor's avatar Simon Pastor
Browse files

pr review - moved metricStore to sbi

parent f3dfc29b
Loading
Loading
Loading
Loading
+52 −9
Original line number Diff line number Diff line
@@ -23,20 +23,25 @@ import (
	dataModel "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-data-model"
	gc "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-gis-cache"
	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	met "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-metrics"
	mod "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-model"
	mq "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mq"
)

const moduleName string = "meep-wais-sbi"

var metricStore *met.MetricStore
var redisAddr string = "meep-redis-master.default.svc.cluster.local:6379"
var influxAddr string = "http://meep-influxdb.default.svc.cluster.local:8086"

type SbiCfg struct {
	SandboxName    string
	RedisAddr      string
	PostgisHost    string
	PostgisPort    string
	StaInfoCb      func(string, string, string, *int32, []string)
	StaInfoCb      func(string, string, string, *int32, *int32, *int32)
	ApInfoCb       func(string, string, *float32, *float32, []string)
	ScenarioNameCb func(string)
	ScenarioNameCb func(string, *bool)
	CleanUpCb      func()
}

@@ -47,9 +52,9 @@ type WaisSbi struct {
	activeModel             *mod.Model
	gisCache                *gc.GisCache
	refreshTicker           *time.Ticker
	updateStaInfoCB         func(string, string, string, *int32, []string)
	updateStaInfoCB         func(string, string, string, *int32, *int32, *int32)
	updateAccessPointInfoCB func(string, string, *float32, *float32, []string)
	updateScenarioNameCB    func(string)
	updateScenarioNameCB    func(string, *bool)
	cleanUpCB               func()
	mutex                   sync.Mutex
}
@@ -176,6 +181,29 @@ func processActiveScenarioTerminate() {
	sbi.cleanUpCB()
}

func getAppSumUlDl(apps []string) (float32, float32) {
	sumUl := 0.0
	sumDl := 0.0
	//var appNames []string
	for _, appName := range apps {
		//appNames = append(appNames, process.Name)
		if metricStore != nil {
			metricsArray, err := metricStore.GetCachedNetworkMetrics("*", appName)
			if err != nil {
				log.Error("Failed to get network metric:", err)
			}

			//downlink for the app is uplink for the UE, and vice-versa
			for _, metrics := range metricsArray {
				sumUl += metrics.DlTput
				sumDl += metrics.UlTput
			}
		}
	}

	return float32(sumUl), float32(sumDl)
}

func processActiveScenarioUpdate() {

	sbi.mutex.Lock()
@@ -195,8 +223,17 @@ func processActiveScenarioUpdate() {
	sbi.activeModel.UpdateScenario()

	scenarioName := sbi.activeModel.GetScenarioName()
	sbi.updateScenarioNameCB(scenarioName)
	var update bool
	sbi.updateScenarioNameCB(scenarioName, &update)

	// Connect to Metric Store
	if update {
		var err error
		metricStore, err = met.NewMetricStore(scenarioName, sbi.sandboxName, influxAddr, redisAddr)
		if err != nil {
			log.Error("Failed connection to metric-store: ", err)
		}
	}
	// Get all POA positions & UE measurments
	poaPositionMap, _ := sbi.gisCache.GetAllPositions(gc.TypePoa)
	ueMeasMap, _ := sbi.gisCache.GetAllMeasurements()
@@ -231,7 +268,10 @@ func processActiveScenarioUpdate() {
				appNames = append(appNames, process.Name)
			}

			sbi.updateStaInfoCB(name, ue.MacId, apMacId, rssi, appNames)
			sumUl, sumDl := getAppSumUlDl(appNames)
			sumUlKbps := int32(sumUl * 1000)
			sumDlKbps := int32(sumDl * 1000)
			sbi.updateStaInfoCB(name, ue.MacId, apMacId, rssi, &sumUlKbps, &sumDlKbps)
		}
	}

@@ -245,7 +285,7 @@ func processActiveScenarioUpdate() {
			}
		}
		if !found {
			sbi.updateStaInfoCB(prevUeName, "", "", nil, nil)
			sbi.updateStaInfoCB(prevUeName, "", "", nil, nil, nil)
			log.Info("Ue removed : ", prevUeName)
		}
	}
@@ -341,7 +381,10 @@ func refreshMeasurements() {
				appNames = append(appNames, process.Name)
			}

			sbi.updateStaInfoCB(name, ue.MacId, apMacId, rssi, appNames)
			sumUl, sumDl := getAppSumUlDl(appNames)
			sumUlKbps := int32(sumUl * 1000)
			sumDlKbps := int32(sumDl * 1000)
			sbi.updateStaInfoCB(name, ue.MacId, apMacId, rssi, &sumUlKbps, &sumDlKbps)
		}
	}
}
+13 −35
Original line number Diff line number Diff line
@@ -53,8 +53,6 @@ const (
	notifTest        = "TestNotification"
)

var metricStore *met.MetricStore

var redisAddr string = "meep-redis-master.default.svc.cluster.local:6379"
var influxAddr string = "http://meep-influxdb.default.svc.cluster.local:8086"

@@ -94,7 +92,6 @@ type ApInfoComplete struct {

type StaData struct {
	StaInfo *StaInfo `json:"staInfo"`
	AppNames []string `json:"appNames"`
}

type StaInfoResp struct {
@@ -198,7 +195,7 @@ func Stop() (err error) {
	return sbi.Stop()
}

func updateStaInfo(name string, ownMacId string, apMacId string, rssi *int32, appNames []string) {
func updateStaInfo(name string, ownMacId string, apMacId string, rssi *int32, sumUl *int32, sumDl *int32) {

	// Get STA Info from DB, if any
	var staData *StaData
@@ -207,25 +204,13 @@ func updateStaInfo(name string, ownMacId string, apMacId string, rssi *int32, ap
		staData = convertJsonToStaData(jsonStaData)
	}

	//update data rates
	sumUl := 0.0
	sumDl := 0.0
	for _, appName := range appNames {

		metricsArray, err := metricStore.GetCachedNetworkMetrics("*", appName)
		if err != nil {
			log.Error("Failed to get network metric:", err)
		}

		//downlink for the app is uplink for the UE, and vice-versa
		for _, metrics := range metricsArray {
			sumUl += metrics.DlTput
			sumDl += metrics.UlTput
	var dataRate StaDataRate
	if sumDl != nil {
		dataRate.StaLastDataDownlinkRate = *sumDl //kbps
	}
	if sumUl != nil {
		dataRate.StaLastDataUplinkRate = *sumUl //kbps
	}
	var dataRate StaDataRate
	dataRate.StaLastDataDownlinkRate = int32(sumDl * 1000) //kbps
	dataRate.StaLastDataUplinkRate = int32(sumUl * 1000)   //kbps

	// Update DB if STA Info does not exist or has changed
	if staData == nil || isStaInfoUpdateRequired(staData.StaInfo, ownMacId, apMacId, rssi, &dataRate) {
@@ -258,7 +243,6 @@ func updateStaInfo(name string, ownMacId string, apMacId string, rssi *int32, ap
		}
		dataRate.StaId = staData.StaInfo.StaId
		staData.StaInfo.StaDataRate = &dataRate
		staData.AppNames = appNames

		_ = rc.JSONSetEntry(baseKey+"UE:"+name, ".", convertStaDataToJson(staData))
	}
@@ -1295,21 +1279,15 @@ func cleanUp() {
	staDataRateSubscriptionMap = map[int]*StaDataRateSubscription{}

	subscriptionExpiryMap = map[int][]int{}
	updateStoreName("")
	var update bool
	updateStoreName("", &update)
}

func updateStoreName(storeName string) {
func updateStoreName(storeName string, update *bool) {
	if currentStoreName != storeName {
		currentStoreName = storeName
		_ = httpLog.ReInit(logModuleWAIS, sandboxName, storeName, redisAddr, influxAddr)

		// Connect to Metric Store
		var err error
		metricStore, err = met.NewMetricStore(storeName, sandboxName, influxAddr, redisAddr)
		if err != nil {
			log.Error("Failed connection to metric-store: ", err)
			return
		}

		*update = true
	}
	*update = false
}
+2 −2
Original line number Diff line number Diff line
@@ -6447,8 +6447,8 @@ func TestSbi(t *testing.T) {

	var expectedStaDataStr [2]string
	var expectedStaData [2]StaData
	expectedStaData[INITIAL] = StaData{&StaInfo{StaDataRate: &StaDataRate{StaId: &StaIdentity{MacId: ""}}, StaId: &StaIdentity{MacId: ""}}, nil}
	expectedStaData[UPDATED] = StaData{&StaInfo{StaDataRate: &StaDataRate{StaId: &StaIdentity{MacId: ueMacId}}, StaId: &StaIdentity{MacId: ueMacId}, ApAssociated: &ApAssociated{Bssid: apMacId2}}, nil}
	expectedStaData[INITIAL] = StaData{&StaInfo{StaDataRate: &StaDataRate{StaId: &StaIdentity{MacId: ""}}, StaId: &StaIdentity{MacId: ""}}}
	expectedStaData[UPDATED] = StaData{&StaInfo{StaDataRate: &StaDataRate{StaId: &StaIdentity{MacId: ueMacId}}, StaId: &StaIdentity{MacId: ueMacId}, ApAssociated: &ApAssociated{Bssid: apMacId2}}}

	var expectedApInfoApIdMacIdStr [2]string
	var expectedApInfoNbStas [2]int