Commit d857756a authored by Simon Pastor's avatar Simon Pastor
Browse files

influx first merge

parent 77112896
Loading
Loading
Loading
Loading
+51 −1
Original line number Diff line number Diff line
@@ -39,6 +39,7 @@ import (
const serviceName = "GIS Engine"
const moduleName = "meep-gis-engine"
const redisAddr = "meep-redis-master.default.svc.cluster.local:6379"
const influxAddr = "http://meep-influxdb.default.svc.cluster.local:8086"
const sboxCtrlBasepath = "http://meep-sandbox-ctrl/sandbox-ctrl/v1"
const postgisUser = "postgres"
const postgisPwd = "pwd"
@@ -93,6 +94,7 @@ type GisEngine struct {
	automation       map[string]bool
	automationTicker *time.Ticker
	updateTime       time.Time
	snapshotTicker   *time.Ticker
	mutex            sync.Mutex
}

@@ -260,6 +262,23 @@ func processScenarioActivate() {

	// Update Gis cache
	updateCache()

	// Start snapshot thread
	if ge.activeModel.GetScenarioName() != "" {
		err := ge.StartSnapshotThread()
		if err != nil {
			log.Error("Failed to start snapshot thread: " + err.Error())
			return
		} else {
			// Connect to GIS cache
			err = ge.gisCache.UpdateGisCacheInflux(ge.sandboxName, ge.activeModel.GetScenarioName(), influxAddr)
			if err != nil {
				log.Error("Failed to GIS Cache: ", err.Error())
			} else {
				log.Info("Connected to GIS Cache")
			}
		}
	}
}

func processScenarioUpdate() {
@@ -293,6 +312,9 @@ func processScenarioTerminate() {
	// Sync with active scenario store
	ge.activeModel.UpdateScenario()

	// Stop snapshot thread
	ge.StopSnapshotThread()

	// Stop automation
	resetAutomation()

@@ -797,7 +819,8 @@ func updateCache() {
				measurement.Rssi = ueMeas.Rssi
				measurement.Rsrp = ueMeas.Rsrp
				measurement.Rsrq = ueMeas.Rsrq
				_ = ge.gisCache.SetMeasurement(ue.Name, ueMeas.Poa, measurement)
				measurement.Distance = ueMeas.Distance
				_ = ge.gisCache.SetMeasurement(ue.Name, ueMeas.SubType, ueMeas.Poa, measurement)
			}
		}
	}
@@ -1297,3 +1320,30 @@ func geUpdateGeoDataByName(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	w.WriteHeader(http.StatusOK)
}

func (ge *GisEngine) StartSnapshotThread() error {
	// Make sure ticker is not already running
	if ge.snapshotTicker != nil {
		return errors.New("ticker already running")
	}

	// Create new ticker and start snapshot thread
	ge.snapshotTicker = time.NewTicker(time.Second)
	go func() {
		for range ge.snapshotTicker.C {
			if ge.gisCache != nil {

				ge.gisCache.TakeUeMetricSnapshot()
			}
		}
	}()

	return nil
}

func (ge *GisEngine) StopSnapshotThread() {
	if ge.snapshotTicker != nil {
		ge.snapshotTicker.Stop()
		ge.snapshotTicker = nil
	}
}
+50 −0
Original line number Diff line number Diff line
/*
 * Copyright (c) 2021  InterDigital Communications, Inc
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package giscache

import (
	"encoding/json"
	"strconv"
)

func JsonNumToInt32(num json.Number) (val int32) {
	if intVal, err := strconv.Atoi(num.String()); err == nil {
		val = int32(intVal)
	}
	return val
}

func JsonNumToFloat64(num json.Number) (val float64) {
	if floatVal, err := num.Float64(); err == nil {
		val = floatVal
	}
	return val
}

func StrToInt32(str string) (val int32) {
	if intVal, err := strconv.Atoi(str); err == nil {
		val = int32(intVal)
	}
	return val
}

func StrToFloat64(str string) (val float64) {
	if floatVal, err := strconv.ParseFloat(str, 64); err == nil {
		val = floatVal
	}
	return val
}
+78 −3
Original line number Diff line number Diff line
@@ -20,12 +20,16 @@ import (
	"fmt"
	"strconv"
	"strings"
	"time"

	dkm "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-data-key-mgr"
	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	redis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis"
	influx "github.com/influxdata/influxdb1-client/v2"
)

const defaultInfluxDBAddr = "http://meep-influxdb.default.svc.cluster.local:8086"
const dbMaxRetryCount = 2
const redisTable = 0

const (
@@ -35,11 +39,15 @@ const (
)

const (
	fieldDistance = "distance"
	fieldLatitude  = "lat"
	fieldLongitude = "long"
	fieldPoa = "poa"
	fieldPoaType = "poatype"
	fieldRssi      = "rssi"
	fieldRsrp      = "rsrp"
	fieldRsrq      = "rsrq"
        fieldSrc = "src"
)

const (
@@ -61,10 +69,13 @@ type Measurement struct {
	Rssi float32
	Rsrp float32
	Rsrq float32
	Distance float32
}

type GisCache struct {
	rc           *redis.Connector
	influxName   string
	influxClient *influx.Client
	baseKey      string
}

@@ -88,6 +99,59 @@ func NewGisCache(sandboxName string, redisAddr string) (gc *GisCache, err error)
	return gc, nil
}

// UpdateGisCacheInflux - Creates and initialize an Influx DB for a GIS Cache instance
func (gc *GisCache) UpdateGisCacheInflux(sandboxName string, scenarioName string, influxAddr string) (err error) {
        // Connect to Influx DB
        for retry := 0; gc.influxClient == nil && retry <= dbMaxRetryCount; retry++ {
                gc.influxClient, err = gc.connectInfluxDB(influxAddr)
                if err != nil {
                        log.Warn("Failed to connect to InfluxDB. Retrying... Error: ", err)
                }
        }
        if err != nil {
                return err
        }
        log.Info("Connected to GIS Cache Influx DB")

        influxName := sandboxName
        if scenarioName != "" {
                influxName = influxName + "_" + scenarioName
        }

        gc.influxName = strings.Replace(influxName, "-", "_", -1)

        err = gc.CreateInfluxDb()
	if err != nil {
		log.Info("Error in creating influx db")
	}

	//nil or not
	return err
}

func (gc *GisCache) connectInfluxDB(addr string) (*influx.Client, error) {
	if addr == "" {
		addr = defaultInfluxDBAddr
	}
	log.Debug("InfluxDB Connector connecting to ", addr)

	client, err := influx.NewHTTPClient(influx.HTTPConfig{Addr: addr, InsecureSkipVerify: true})
	if err != nil {
		log.Error("InfluxDB Connector unable to connect ", addr)
		return nil, err
	}
	defer client.Close()

	_, version, err := client.Ping(1000 * time.Millisecond)
	if err != nil {
		log.Error("InfluxDB Connector unable to connect ", addr)
		return nil, err
	}

	log.Info("InfluxDB Connector connected to ", addr, " version: ", version)
	return &client, nil
}

// SetPosition - Create or update entry in DB
func (gc *GisCache) SetPosition(typ string, name string, position *Position) error {
	key := gc.baseKey + posKey + typ + ":" + name
@@ -153,14 +217,19 @@ func (gc *GisCache) DelPosition(typ string, name string) {
}

// SetMeasurement - Create or update entry in DB
func (gc *GisCache) SetMeasurement(ue string, poa string, meas *Measurement) error {
func (gc *GisCache) SetMeasurement(ue string, poaType string, poa string, meas *Measurement) error {
	key := gc.baseKey + measKey + ue + ":" + poa

	// Prepare data
	fields := make(map[string]interface{})
        fields[fieldSrc] = fmt.Sprintf("%s", ue)
        fields[fieldPoa] = fmt.Sprintf("%s", poa)
        fields[fieldPoaType] = fmt.Sprintf("%s", poaType)
	fields[fieldRssi] = fmt.Sprintf("%f", meas.Rssi)
	fields[fieldRsrp] = fmt.Sprintf("%f", meas.Rsrp)
	fields[fieldRsrq] = fmt.Sprintf("%f", meas.Rsrq)
        fields[fieldDistance] = fmt.Sprintf("%f", meas.Distance)


	// Update entry in DB
	err := gc.rc.SetEntry(key, fields)
@@ -215,6 +284,10 @@ func getMeasurement(key string, fields map[string]string, userData interface{})
	if rsrq, err := strconv.ParseFloat(fields[fieldRsrq], 32); err == nil {
		meas.Rsrq = float32(rsrq)
	}
        if distance, err := strconv.ParseFloat(fields[fieldDistance], 32); err == nil {
                meas.Distance = float32(distance)
        }


	// Add measurement to map
	ueMeas, found := measurementMap[ueName]
@@ -239,4 +312,6 @@ func (gc *GisCache) DelMeasurement(ue string, poa string) {
// Flush - Remove all GIS cache entries
func (gc *GisCache) Flush() {
	gc.rc.DBFlush(gc.baseKey)

	gc.FlushInfluxDb()
}
+319 −0
Original line number Diff line number Diff line
/*
 * Copyright (c) 2021  InterDigital Communications, Inc
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package giscache

import (
	"errors"
	"strconv"
	"strings"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	influx "github.com/influxdata/influxdb1-client/v2"
)

const UeMetName = "meas"
const UeMetNameInflux = "gis"
const UeMetSrc = "src"
const UeMetPoa = "poa"
const UeMetPoaType = "poatype"
const UeMetRssi = "rssi"
const UeMetRsrp = "rsrp"
const UeMetRsrq = "rsrq"
const UeMetDistance = "distance"
const UeMetTime = "time"

type Metric struct {
	Name   string
	Tags   map[string]string
	Fields map[string]interface{}
}

type UeMetric struct {
	Src      string
	Poa      string
	PoaType  string
	Time     interface{}
	Rssi     int32
	Rsrp     int32
	Rsrq     int32
	Distance float64
}

// SetInfluxMetric - Generic metric setter
func (gc *GisCache) SetInfluxMetric(metricList []Metric) error {
	if gc.influxClient == nil {
		return errors.New("Not connected to Influx DB")
	}

	// Create a new point batch
	bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
		Database:  gc.influxName,
		Precision: "ns",
	})

	// Create & add points to batch
	for _, metric := range metricList {
		pt, err := influx.NewPoint(metric.Name, metric.Tags, metric.Fields)
		if err != nil {
			log.Error("Failed to create point with error: ", err)
			return err
		}
		bp.AddPoint(pt)
	}

	// Write the batch
	err := (*gc.influxClient).Write(bp)
	if err != nil {
		log.Error("Failed to write point with error: ", err)
		return err
	}
	return nil
}

// GetInfluxMetric - Generic metric getter
func (gc *GisCache) GetInfluxMetric(metric string, tags map[string]string, fields []string, duration string, count int) (values []map[string]interface{}, err error) {
	if gc.influxClient == nil {
		return values, errors.New("Not connected to Influx DB")
	}

	// Create query

	// Fields
	fieldStr := ""
	for _, field := range fields {
		if fieldStr == "" {
			fieldStr = field
		} else {
			fieldStr += "," + field
		}
	}
	if fieldStr == "" {
		fieldStr = "*"
	}

	// Tags
	tagStr := ""
	for k, v := range tags {
		mv := strings.Split(v, ",")

		if tagStr == "" {
			tagStr = " WHERE (" // + k + "='" + v + "'"
		} else {
			tagStr += " AND (" //+ k + "='" + v + "'"
		}
		for i, v := range mv {
			if i != 0 {
				tagStr += " OR "
			}
			tagStr += k + "='" + v + "'"
		}
		tagStr += ")"
	}
	if duration != "" {
		if tagStr == "" {
			tagStr = " WHERE time > now() - " + duration
		} else {
			tagStr += " AND time > now() - " + duration
		}
	}

	// Count
	countStr := ""
	if count != 0 {
		countStr = " LIMIT " + strconv.Itoa(count)
	}

	query := "SELECT " + fieldStr + " FROM " + metric + " " + tagStr + " ORDER BY desc" + countStr
	log.Debug("QUERY: ", query)

	// Query store for metric
	q := influx.NewQuery(query, gc.influxName, "")
	response, err := (*gc.influxClient).Query(q)
	if err != nil {
		log.Error("Query failed with error: ", err.Error())
		return values, err
	}

	// Process response
	if len(response.Results) > 0 && len(response.Results[0].Series) > 0 {
		row := response.Results[0].Series[0]
		for _, qValues := range row.Values {
			rValues := make(map[string]interface{})
			for index, qVal := range qValues {
				rValues[row.Columns[index]] = qVal
			}
			values = append(values, rValues)
		}
	}

	return values, nil
}

func (gc *GisCache) formatCachedUeMetric(values map[string]interface{}) (metric UeMetric, err error) {
	var ok bool
	var val interface{}

	// Process field values
	if val, ok = values[UeMetSrc]; !ok {
		val = ""
	}
	metric.Src = val.(string)

	if val, ok = values[UeMetPoa]; !ok {
		val = ""
	}
	metric.Poa = val.(string)

	if val, ok = values[UeMetPoaType]; !ok {
		val = ""
	}
	metric.PoaType = val.(string)

	if val, ok = values[UeMetRssi]; !ok {
		val = ""
	}
	rssi := StrToFloat64(val.(string))
	metric.Rssi = int32(rssi)

	if val, ok = values[UeMetRsrp]; !ok {
		val = ""
	}
	rsrp := StrToFloat64(val.(string))
	metric.Rsrp = int32(rsrp)

	if val, ok = values[UeMetRsrq]; !ok {
		val = ""
	}
	rsrq := StrToFloat64(val.(string))
	metric.Rsrq = int32(rsrq)

	if val, ok = values[UeMetDistance]; !ok {
		val = ""
	}
	metric.Distance = StrToFloat64(val.(string))

	return metric, nil
}

// GetRedisMetric - Generic metric getter
func (gc *GisCache) GetRedisMetric(metric string, tagStr string) (values []map[string]interface{}, err error) {
	if gc.rc == nil {
		err = errors.New("Redis metrics DB not accessible")
		return values, err
	}

	// Get latest metrics
	key := gc.baseKey + metric + ":" + tagStr

	err = gc.rc.ForEachEntry(key, gc.getMetricsEntryHandler, &values)
	if err != nil {
		log.Error("Failed to get entries: ", err)
		return nil, err
	}
	return values, nil
}

func (gc *GisCache) getMetricsEntryHandler(key string, fields map[string]string, userData interface{}) error {
	// Retrieve field values
	values := make(map[string]interface{})
	for k, v := range fields {
		values[k] = v
	}
	// Append values list to data
	data := userData.(*[]map[string]interface{})
	*data = append(*data, values)

	return nil
}

func (gc *GisCache) TakeUeMetricSnapshot() {
	// start = time.Now()

	// Get all cached metrics
	valuesArray, err := gc.GetRedisMetric(UeMetName, "*")
	if err != nil {
		log.Error("Failed to retrieve metrics with error: ", err.Error())
		return
	}

	// logTimeLapse("GetRedisMetric wildcard")

	// Prepare ue metrics list
	metricList := make([]Metric, len(valuesArray))
	for index, values := range valuesArray {
		// Format network metric
		nm, err := gc.formatCachedUeMetric(values)
		if err != nil {
			continue
		}

		// Add metric to list
		metric := &metricList[index]
		metric.Name = UeMetNameInflux
		metric.Tags = map[string]string{UeMetSrc: nm.Src, UeMetPoa: nm.Poa, UeMetPoaType: nm.PoaType}
		metric.Fields = map[string]interface{}{
			UeMetRssi:     nm.Rssi,
			UeMetRsrp:     nm.Rsrp,
			UeMetRsrq:     nm.Rsrq,
			UeMetDistance: nm.Distance,
		}
	}

	// Store metrics in influx
	err = gc.SetInfluxMetric(metricList)
	if err != nil {
		log.Error("Fail to write influx metrics with error: ", err.Error())
	}

	// logTimeLapse("Write to Influx")
}

// CreateInfluxDb -
func (gc *GisCache) CreateInfluxDb() error {

        if gc.influxName != "" {

                // Create new DB if necessary
                if gc.influxClient != nil {
                        q := influx.NewQuery("CREATE DATABASE "+ gc.influxName, "", "")
                        _, err := (*gc.influxClient).Query(q)
                        if err != nil {
                                log.Error("Query failed with error: ", err.Error())
                                return err
                        }
                }
        } else {
		log.Error("Nil influxDbName")
	}

	log.Info("Influx database ", gc.influxName, " created")

        return nil
}

// FlushInfluxDb -
func (gc *GisCache) FlushInfluxDb() {
        // Flush Influx DB
        if gc.influxClient != nil {
                q := influx.NewQuery("DROP SERIES FROM /.*/", gc.influxName, "")
                response, err := (*gc.influxClient).Query(q)
                if err != nil {
                        log.Error("Query failed with error: ", err.Error())
                }
                log.Info(response.Results)
        }
}