Commit c52bbf25 authored by Simon Pastor's avatar Simon Pastor
Browse files

1st part of pr review

parent 6cbbc204
Loading
Loading
Loading
Loading
+29 −12
Original line number Diff line number Diff line
@@ -30,6 +30,7 @@ import (
	am "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-gis-asset-mgr"
	gc "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-gis-cache"
	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	met "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-metrics"
	mod "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-model"
	mq "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mq"
	sbox "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-sandbox-ctrl-client"
@@ -88,6 +89,8 @@ type GisEngine struct {
	sboxCtrlClient   *sbox.APIClient
	activeModel      *mod.Model
	gisCache         *gc.GisCache
	metricStore      *met.MetricStore
	storeName        string
	assetMgr         *am.AssetMgr
	assets           map[string]*Asset
	ueInfo           map[string]*UeInfo
@@ -264,12 +267,25 @@ func processScenarioActivate() {
	updateCache()

	// Start snapshot thread
	if ge.activeModel.GetScenarioName() != "" {
	scenarioName := ge.activeModel.GetScenarioName()
	if scenarioName != "" {
		err := ge.StartSnapshotThread()

		if ge.storeName != scenarioName {
			ge.storeName = scenarioName
			// Connect to Metric Store
			ge.metricStore, err = met.NewMetricStore(ge.storeName, ge.sandboxName, influxAddr, redisAddr)
			if err != nil {
			log.Error("Failed to start snapshot thread: " + err.Error())
				log.Error("Failed connection to metric-store: ", err)
				return
			}
		} else {
			if err != nil {
				log.Error("Failed to start snapshot thread: " + err.Error())
				return
			}
			/*else {

				// Connect to GIS cache
				err = ge.gisCache.UpdateGisCacheInflux(ge.sandboxName, ge.activeModel.GetScenarioName(), influxAddr)
				if err != nil {
@@ -278,6 +294,8 @@ func processScenarioActivate() {
					log.Info("Connected to GIS Cache")
				}
			}
			*/
		}
	}
}

@@ -1331,9 +1349,8 @@ func (ge *GisEngine) StartSnapshotThread() error {
	ge.snapshotTicker = time.NewTicker(time.Second)
	go func() {
		for range ge.snapshotTicker.C {
			if ge.gisCache != nil {

				ge.gisCache.TakeUeMetricSnapshot()
			if ge.metricStore != nil {
				ge.metricStore.TakeUeMetricSnapshot()
			}
		}
	}()
+4 −9
Original line number Diff line number Diff line
@@ -20,16 +20,13 @@ import (
	"fmt"
	"strconv"
	"strings"
	"time"
//	"time"

	dkm "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-data-key-mgr"
	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	redis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis"
	influx "github.com/influxdata/influxdb1-client/v2"
)

const defaultInfluxDBAddr = "http://meep-influxdb.default.svc.cluster.local:8086"
const dbMaxRetryCount = 2
const redisTable = 0

const (
@@ -75,8 +72,6 @@ type Measurement struct {

type GisCache struct {
	rc           *redis.Connector
	influxName   string
	influxClient *influx.Client
	baseKey      string
}

@@ -99,7 +94,7 @@ func NewGisCache(sandboxName string, redisAddr string) (gc *GisCache, err error)
	log.Info("Created GIS Cache")
	return gc, nil
}

/*
// UpdateGisCacheInflux - Creates and initialize an Influx DB for a GIS Cache instance
func (gc *GisCache) UpdateGisCacheInflux(sandboxName string, scenarioName string, influxAddr string) (err error) {
	// Connect to Influx DB
@@ -152,7 +147,7 @@ func (gc *GisCache) connectInfluxDB(addr string) (*influx.Client, error) {
	log.Info("InfluxDB Connector connected to ", addr, " version: ", version)
	return &client, nil
}

*/
// SetPosition - Create or update entry in DB
func (gc *GisCache) SetPosition(typ string, name string, position *Position) error {
	key := gc.baseKey + posKey + typ + ":" + name
@@ -313,5 +308,5 @@ func (gc *GisCache) DelMeasurement(ue string, poa string) {
func (gc *GisCache) Flush() {
	gc.rc.DBFlush(gc.baseKey)

	gc.FlushInfluxDb()
//	gc.FlushInfluxDb()
}
+43 −38
Original line number Diff line number Diff line
@@ -14,15 +14,15 @@
 * limitations under the License.
 */

package giscache
package metrics

import (
	"errors"
	"strconv"
	"strings"
//	"strconv"
//	"strings"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	influx "github.com/influxdata/influxdb1-client/v2"
//	influx "github.com/influxdata/influxdb1-client/v2"
)

const UeMetName = "meas"
@@ -40,12 +40,12 @@ const UeMetRsrq = "rsrq"
const UeMetDistance = "dist"
const UeMetTime = "time"

type Metric struct {
/*type Metric struct {
	Name   string
	Tags   map[string]string
	Fields map[string]interface{}
}

*/
type UeMetric struct {
	Src      string
	SrcType  string
@@ -57,7 +57,7 @@ type UeMetric struct {
	Rsrq     int32
	Distance float64
}

/*
// SetInfluxMetric - Generic metric setter
func (gc *GisCache) SetInfluxMetric(metricList []Metric) error {
	if gc.influxClient == nil {
@@ -167,8 +167,8 @@ func (gc *GisCache) GetInfluxMetric(metric string, tags map[string]string, field

	return values, nil
}

func (gc *GisCache) formatCachedUeMetric(values map[string]interface{}) (metric UeMetric, err error) {
*/
func (ms *MetricStore) formatCachedUeMetric(values map[string]interface{}) (metric UeMetric, err error) {
	var ok bool
	var val interface{}

@@ -220,23 +220,29 @@ func (gc *GisCache) formatCachedUeMetric(values map[string]interface{}) (metric
}

// GetRedisMetric - Generic metric getter
func (gc *GisCache) GetRedisMetric(metric string, tagStr string) (values []map[string]interface{}, err error) {
	if gc.rc == nil {
		err = errors.New("Redis metrics DB not accessible")
func (ms *MetricStore) getGisCacheRedisMetric(metric string, tagStr string) (values []map[string]interface{}, err error) {

        if ms.name == "" {
                err := errors.New("Store name not specified")
                return values, err
        }
        if ms.redisClient == nil {
                err = errors.New("Redis metrics DB disabled")
                return values, err
        }

	// Get latest metrics
	key := gc.baseKey + metric + ":" + tagStr
	//key := gc.baseKey + metric + ":" + tagStr
	key := ms.baseKeyRef + "gis-cache:" + metric + ":" + tagStr

	err = gc.rc.ForEachEntry(key, gc.getMetricsEntryHandler, &values)
	err = ms.redisClient.ForEachEntry(key, ms.getMetricsEntryHandler, &values)
	if err != nil {
		log.Error("Failed to get entries: ", err)
		return nil, err
	}
	return values, nil
}

/*
func (gc *GisCache) getMetricsEntryHandler(key string, fields map[string]string, userData interface{}) error {
	// Retrieve field values
	values := make(map[string]interface{})
@@ -249,25 +255,23 @@ func (gc *GisCache) getMetricsEntryHandler(key string, fields map[string]string,

	return nil
}

func (gc *GisCache) TakeUeMetricSnapshot() {
*/
func (ms *MetricStore) TakeUeMetricSnapshot() {
	// start = time.Now()

	// Get all cached metrics
	valuesArray, err := gc.GetRedisMetric(UeMetName, "*")
	valuesArray, err := ms.getGisCacheRedisMetric(UeMetName, "*")
	if err != nil {
		log.Error("Failed to retrieve metrics with error: ", err.Error())
		return
	}

	// logTimeLapse("GetRedisMetric wildcard")

	// Prepare ue metrics list
	metricSignalList := make([]Metric, len(valuesArray))
	metricDistanceList := make([]Metric, len(valuesArray))
	for index, values := range valuesArray {
		// Format network metric
		nm, err := gc.formatCachedUeMetric(values)
		nm, err := ms.formatCachedUeMetric(values)
		if err != nil {
			continue
		}
@@ -291,19 +295,19 @@ func (gc *GisCache) TakeUeMetricSnapshot() {
	}

	// Store metrics in influx
	err = gc.SetInfluxMetric(metricSignalList)
	err = ms.SetInfluxMetric(metricSignalList)
	if err != nil {
		log.Error("Fail to write influx metrics with error: ", err.Error())
	}
	// Store metrics in influx
	err = gc.SetInfluxMetric(metricDistanceList)
	err = ms.SetInfluxMetric(metricDistanceList)
	if err != nil {
		log.Error("Fail to write influx metrics with error: ", err.Error())
	}

	// logTimeLapse("Write to Influx")
}

/*
// CreateInfluxDb -
func (gc *GisCache) CreateInfluxDb() error {

@@ -326,16 +330,17 @@ func (gc *GisCache) CreateInfluxDb() error {

	return nil
}

*/
// FlushInfluxDb -
func (gc *GisCache) FlushInfluxDb() {
	// Flush Influx DB
	if gc.influxClient != nil {
		q := influx.NewQuery("DROP SERIES FROM /.*/", gc.influxName, "")
		response, err := (*gc.influxClient).Query(q)
		if err != nil {
			log.Error("Query failed with error: ", err.Error())
		}
		log.Info(response.Results)
	}
}
//func (gc *GisCache) FlushInfluxDb() {
//	// Flush Influx DB
//	if gc.influxClient != nil {
//		q := influx.NewQuery("DROP SERIES FROM /.*/", gc.influxName, "")
//		response, err := (*gc.influxClient).Query(q)
//		if err != nil {
//			log.Error("Query failed with error: ", err.Error())
//		}
//		log.Info(response.Results)
//	}
//}
+2 −0
Original line number Diff line number Diff line
@@ -50,6 +50,7 @@ type MetricStore struct {
	name           string
	namespace      string
	baseKey        string
        baseKeyRef     string
	addr           string
	influxClient   *influx.Client
	redisClient    *redis.Connector
@@ -67,6 +68,7 @@ func NewMetricStore(name string, namespace string, influxAddr string, redisAddr
	// Create new Metric Store instance
	ms = new(MetricStore)
	ms.namespace = namespace
        ms.baseKeyRef = dkm.GetKeyRoot(namespace)
	ms.baseKey = dkm.GetKeyRoot(namespace) + metricsKey

	// Connect to Redis DB