Loading go-apps/meep-gis-engine/server/gis-engine.go +1 −1 Original line number Diff line number Diff line Loading @@ -1350,7 +1350,7 @@ func (ge *GisEngine) StartSnapshotThread() error { go func() { for range ge.snapshotTicker.C { if ge.metricStore != nil { ge.metricStore.TakeUeMetricSnapshot() ge.metricStore.TakeGisMetricSnapshot() } } }() Loading go-packages/meep-metrics/gis.go +46 −203 Original line number Diff line number Diff line Loading @@ -18,35 +18,26 @@ package metrics import ( "errors" // "strconv" // "strings" log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger" // influx "github.com/influxdata/influxdb1-client/v2" ) const UeMetName = "meas" const UeMetNameInflux = "gis" const UeMetSrc = "src" const UeMetSrcType = "srcType" const UeMetDest = "dest" const UeMetDestType = "destType" const UeMetMeasType = "measType" const UeMetMeasTypeDistance = "distance" const UeMetMeasTypeSignal = "signal" const UeMetRssi = "rssi" const UeMetRsrp = "rsrp" const UeMetRsrq = "rsrq" const UeMetDistance = "dist" const UeMetTime = "time" /*type Metric struct { Name string Tags map[string]string Fields map[string]interface{} } */ type UeMetric struct { const GisMetName = "meas" const GisMetNameInflux = "gis" const GisMetSrc = "src" const GisMetSrcType = "srcType" const GisMetDest = "dest" const GisMetDestType = "destType" const GisMetMeasType = "measType" const GisMetMeasTypeDistance = "distance" const GisMetMeasTypeSignal = "signal" const GisMetRssi = "rssi" const GisMetRsrp = "rsrp" const GisMetRsrq = "rsrq" const GisMetDistance = "dist" const GisMetTime = "time" type GisMetric struct { Src string SrcType string Dest string Loading @@ -57,161 +48,51 @@ type UeMetric struct { Rsrq int32 Distance float64 } /* // SetInfluxMetric - Generic metric setter func (gc *GisCache) SetInfluxMetric(metricList []Metric) error { if gc.influxClient == nil { return errors.New("Not connected to Influx DB") } // Create a new point batch bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{ Database: gc.influxName, Precision: "ns", }) // Create & add points to batch for _, metric := range metricList { pt, err := influx.NewPoint(metric.Name, metric.Tags, metric.Fields) if err != nil { log.Error("Failed to create point with error: ", err) return err } bp.AddPoint(pt) } // Write the batch err := (*gc.influxClient).Write(bp) if err != nil { log.Error("Failed to write point with error: ", err) return err } return nil } // GetInfluxMetric - Generic metric getter func (gc *GisCache) GetInfluxMetric(metric string, tags map[string]string, fields []string, duration string, count int) (values []map[string]interface{}, err error) { if gc.influxClient == nil { return values, errors.New("Not connected to Influx DB") } // Create query // Fields fieldStr := "" for _, field := range fields { if fieldStr == "" { fieldStr = field } else { fieldStr += "," + field } } if fieldStr == "" { fieldStr = "*" } // Tags tagStr := "" for k, v := range tags { mv := strings.Split(v, ",") if tagStr == "" { tagStr = " WHERE (" // + k + "='" + v + "'" } else { tagStr += " AND (" //+ k + "='" + v + "'" } for i, v := range mv { if i != 0 { tagStr += " OR " } tagStr += k + "='" + v + "'" } tagStr += ")" } if duration != "" { if tagStr == "" { tagStr = " WHERE time > now() - " + duration } else { tagStr += " AND time > now() - " + duration } } // Count countStr := "" if count != 0 { countStr = " LIMIT " + strconv.Itoa(count) } query := "SELECT " + fieldStr + " FROM " + metric + " " + tagStr + " ORDER BY desc" + countStr log.Debug("QUERY: ", query) // Query store for metric q := influx.NewQuery(query, gc.influxName, "") response, err := (*gc.influxClient).Query(q) if err != nil { log.Error("Query failed with error: ", err.Error()) return values, err } // Process response if len(response.Results) > 0 && len(response.Results[0].Series) > 0 { row := response.Results[0].Series[0] for _, qValues := range row.Values { rValues := make(map[string]interface{}) for index, qVal := range qValues { rValues[row.Columns[index]] = qVal } values = append(values, rValues) } } return values, nil } */ func (ms *MetricStore) formatCachedUeMetric(values map[string]interface{}) (metric UeMetric, err error) { func (ms *MetricStore) formatCachedGisMetric(values map[string]interface{}) (metric GisMetric, err error) { var ok bool var val interface{} // Process field values if val, ok = values[UeMetSrc]; !ok { if val, ok = values[GisMetSrc]; !ok { val = "" } metric.Src = val.(string) if val, ok = values[UeMetSrcType]; !ok { if val, ok = values[GisMetSrcType]; !ok { val = "" } metric.SrcType = val.(string) if val, ok = values[UeMetDest]; !ok { if val, ok = values[GisMetDest]; !ok { val = "" } metric.Dest = val.(string) if val, ok = values[UeMetDestType]; !ok { if val, ok = values[GisMetDestType]; !ok { val = "" } metric.DestType = val.(string) if val, ok = values[UeMetRssi]; !ok { if val, ok = values[GisMetRssi]; !ok { val = "" } rssi := StrToFloat64(val.(string)) metric.Rssi = int32(rssi) if val, ok = values[UeMetRsrp]; !ok { if val, ok = values[GisMetRsrp]; !ok { val = "" } rsrp := StrToFloat64(val.(string)) metric.Rsrp = int32(rsrp) if val, ok = values[UeMetRsrq]; !ok { if val, ok = values[GisMetRsrq]; !ok { val = "" } rsrq := StrToFloat64(val.(string)) metric.Rsrq = int32(rsrq) if val, ok = values[UeMetDistance]; !ok { if val, ok = values[GisMetDistance]; !ok { val = "" } metric.Distance = StrToFloat64(val.(string)) Loading Loading @@ -256,91 +137,53 @@ func (gc *GisCache) getMetricsEntryHandler(key string, fields map[string]string, return nil } */ func (ms *MetricStore) TakeUeMetricSnapshot() { func (ms *MetricStore) TakeGisMetricSnapshot() { // start = time.Now() // Get all cached metrics valuesArray, err := ms.getGisCacheRedisMetric(UeMetName, "*") valuesArray, err := ms.getGisCacheRedisMetric(GisMetName, "*") if err != nil { log.Error("Failed to retrieve metrics with error: ", err.Error()) return } // Prepare ue metrics list metricSignalList := make([]Metric, len(valuesArray)) metricDistanceList := make([]Metric, len(valuesArray)) for index, values := range valuesArray { // Prepare gis metrics list (for signal and distance type of measurements // Each result from redis will create 2 entries in influxdb gisMetricList := make([]Metric, 2*len(valuesArray)) index := 0 for _, values := range valuesArray { // Format network metric nm, err := ms.formatCachedUeMetric(values) nm, err := ms.formatCachedGisMetric(values) if err != nil { continue } // Add metric to list metricSignal := &metricSignalList[index] metricSignal.Name = UeMetNameInflux metricSignal.Tags = map[string]string{UeMetSrc: nm.Src, UeMetSrcType: nm.SrcType, UeMetDest: nm.Dest, UeMetDestType: nm.DestType, UeMetMeasType: UeMetMeasTypeSignal} metricSignal := &gisMetricList[index] index++ metricSignal.Name = GisMetNameInflux metricSignal.Tags = map[string]string{GisMetSrc: nm.Src, GisMetSrcType: nm.SrcType, GisMetDest: nm.Dest, GisMetDestType: nm.DestType, GisMetMeasType: GisMetMeasTypeSignal} metricSignal.Fields = map[string]interface{}{ UeMetRssi: nm.Rssi, UeMetRsrp: nm.Rsrp, UeMetRsrq: nm.Rsrq, } metricDistance := &metricDistanceList[index] metricDistance.Name = UeMetNameInflux metricDistance.Tags = map[string]string{UeMetSrc: nm.Src, UeMetSrcType: nm.SrcType, UeMetDest: nm.Dest, UeMetDestType: nm.DestType, UeMetMeasType: UeMetMeasTypeDistance} GisMetRssi: nm.Rssi, GisMetRsrp: nm.Rsrp, GisMetRsrq: nm.Rsrq, } metricDistance := &gisMetricList[index] index++ metricDistance.Name = GisMetNameInflux metricDistance.Tags = map[string]string{GisMetSrc: nm.Src, GisMetSrcType: nm.SrcType, GisMetDest: nm.Dest, GisMetDestType: nm.DestType, GisMetMeasType: GisMetMeasTypeDistance} metricDistance.Fields = map[string]interface{}{ UeMetDistance: nm.Distance, GisMetDistance: nm.Distance, } } // Store metrics in influx err = ms.SetInfluxMetric(metricSignalList) if err != nil { log.Error("Fail to write influx metrics with error: ", err.Error()) } // Store metrics in influx err = ms.SetInfluxMetric(metricDistanceList) err = ms.SetInfluxMetric(gisMetricList) if err != nil { log.Error("Fail to write influx metrics with error: ", err.Error()) } // logTimeLapse("Write to Influx") } /* // CreateInfluxDb - func (gc *GisCache) CreateInfluxDb() error { if gc.influxName != "" { // Create new DB if necessary if gc.influxClient != nil { q := influx.NewQuery("CREATE DATABASE "+gc.influxName, "", "") _, err := (*gc.influxClient).Query(q) if err != nil { log.Error("Query failed with error: ", err.Error()) return err } } } else { log.Error("Nil influxDbName") } log.Info("Influx database ", gc.influxName, " created") return nil } */ // FlushInfluxDb - //func (gc *GisCache) FlushInfluxDb() { // // Flush Influx DB // if gc.influxClient != nil { // q := influx.NewQuery("DROP SERIES FROM /.*/", gc.influxName, "") // response, err := (*gc.influxClient).Query(q) // if err != nil { // log.Error("Query failed with error: ", err.Error()) // } // log.Info(response.Results) // } //} go-packages/meep-metrics/metric-store.go +2 −2 File changed.Contains only whitespace changes. Show changes Loading
go-apps/meep-gis-engine/server/gis-engine.go +1 −1 Original line number Diff line number Diff line Loading @@ -1350,7 +1350,7 @@ func (ge *GisEngine) StartSnapshotThread() error { go func() { for range ge.snapshotTicker.C { if ge.metricStore != nil { ge.metricStore.TakeUeMetricSnapshot() ge.metricStore.TakeGisMetricSnapshot() } } }() Loading
go-packages/meep-metrics/gis.go +46 −203 Original line number Diff line number Diff line Loading @@ -18,35 +18,26 @@ package metrics import ( "errors" // "strconv" // "strings" log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger" // influx "github.com/influxdata/influxdb1-client/v2" ) const UeMetName = "meas" const UeMetNameInflux = "gis" const UeMetSrc = "src" const UeMetSrcType = "srcType" const UeMetDest = "dest" const UeMetDestType = "destType" const UeMetMeasType = "measType" const UeMetMeasTypeDistance = "distance" const UeMetMeasTypeSignal = "signal" const UeMetRssi = "rssi" const UeMetRsrp = "rsrp" const UeMetRsrq = "rsrq" const UeMetDistance = "dist" const UeMetTime = "time" /*type Metric struct { Name string Tags map[string]string Fields map[string]interface{} } */ type UeMetric struct { const GisMetName = "meas" const GisMetNameInflux = "gis" const GisMetSrc = "src" const GisMetSrcType = "srcType" const GisMetDest = "dest" const GisMetDestType = "destType" const GisMetMeasType = "measType" const GisMetMeasTypeDistance = "distance" const GisMetMeasTypeSignal = "signal" const GisMetRssi = "rssi" const GisMetRsrp = "rsrp" const GisMetRsrq = "rsrq" const GisMetDistance = "dist" const GisMetTime = "time" type GisMetric struct { Src string SrcType string Dest string Loading @@ -57,161 +48,51 @@ type UeMetric struct { Rsrq int32 Distance float64 } /* // SetInfluxMetric - Generic metric setter func (gc *GisCache) SetInfluxMetric(metricList []Metric) error { if gc.influxClient == nil { return errors.New("Not connected to Influx DB") } // Create a new point batch bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{ Database: gc.influxName, Precision: "ns", }) // Create & add points to batch for _, metric := range metricList { pt, err := influx.NewPoint(metric.Name, metric.Tags, metric.Fields) if err != nil { log.Error("Failed to create point with error: ", err) return err } bp.AddPoint(pt) } // Write the batch err := (*gc.influxClient).Write(bp) if err != nil { log.Error("Failed to write point with error: ", err) return err } return nil } // GetInfluxMetric - Generic metric getter func (gc *GisCache) GetInfluxMetric(metric string, tags map[string]string, fields []string, duration string, count int) (values []map[string]interface{}, err error) { if gc.influxClient == nil { return values, errors.New("Not connected to Influx DB") } // Create query // Fields fieldStr := "" for _, field := range fields { if fieldStr == "" { fieldStr = field } else { fieldStr += "," + field } } if fieldStr == "" { fieldStr = "*" } // Tags tagStr := "" for k, v := range tags { mv := strings.Split(v, ",") if tagStr == "" { tagStr = " WHERE (" // + k + "='" + v + "'" } else { tagStr += " AND (" //+ k + "='" + v + "'" } for i, v := range mv { if i != 0 { tagStr += " OR " } tagStr += k + "='" + v + "'" } tagStr += ")" } if duration != "" { if tagStr == "" { tagStr = " WHERE time > now() - " + duration } else { tagStr += " AND time > now() - " + duration } } // Count countStr := "" if count != 0 { countStr = " LIMIT " + strconv.Itoa(count) } query := "SELECT " + fieldStr + " FROM " + metric + " " + tagStr + " ORDER BY desc" + countStr log.Debug("QUERY: ", query) // Query store for metric q := influx.NewQuery(query, gc.influxName, "") response, err := (*gc.influxClient).Query(q) if err != nil { log.Error("Query failed with error: ", err.Error()) return values, err } // Process response if len(response.Results) > 0 && len(response.Results[0].Series) > 0 { row := response.Results[0].Series[0] for _, qValues := range row.Values { rValues := make(map[string]interface{}) for index, qVal := range qValues { rValues[row.Columns[index]] = qVal } values = append(values, rValues) } } return values, nil } */ func (ms *MetricStore) formatCachedUeMetric(values map[string]interface{}) (metric UeMetric, err error) { func (ms *MetricStore) formatCachedGisMetric(values map[string]interface{}) (metric GisMetric, err error) { var ok bool var val interface{} // Process field values if val, ok = values[UeMetSrc]; !ok { if val, ok = values[GisMetSrc]; !ok { val = "" } metric.Src = val.(string) if val, ok = values[UeMetSrcType]; !ok { if val, ok = values[GisMetSrcType]; !ok { val = "" } metric.SrcType = val.(string) if val, ok = values[UeMetDest]; !ok { if val, ok = values[GisMetDest]; !ok { val = "" } metric.Dest = val.(string) if val, ok = values[UeMetDestType]; !ok { if val, ok = values[GisMetDestType]; !ok { val = "" } metric.DestType = val.(string) if val, ok = values[UeMetRssi]; !ok { if val, ok = values[GisMetRssi]; !ok { val = "" } rssi := StrToFloat64(val.(string)) metric.Rssi = int32(rssi) if val, ok = values[UeMetRsrp]; !ok { if val, ok = values[GisMetRsrp]; !ok { val = "" } rsrp := StrToFloat64(val.(string)) metric.Rsrp = int32(rsrp) if val, ok = values[UeMetRsrq]; !ok { if val, ok = values[GisMetRsrq]; !ok { val = "" } rsrq := StrToFloat64(val.(string)) metric.Rsrq = int32(rsrq) if val, ok = values[UeMetDistance]; !ok { if val, ok = values[GisMetDistance]; !ok { val = "" } metric.Distance = StrToFloat64(val.(string)) Loading Loading @@ -256,91 +137,53 @@ func (gc *GisCache) getMetricsEntryHandler(key string, fields map[string]string, return nil } */ func (ms *MetricStore) TakeUeMetricSnapshot() { func (ms *MetricStore) TakeGisMetricSnapshot() { // start = time.Now() // Get all cached metrics valuesArray, err := ms.getGisCacheRedisMetric(UeMetName, "*") valuesArray, err := ms.getGisCacheRedisMetric(GisMetName, "*") if err != nil { log.Error("Failed to retrieve metrics with error: ", err.Error()) return } // Prepare ue metrics list metricSignalList := make([]Metric, len(valuesArray)) metricDistanceList := make([]Metric, len(valuesArray)) for index, values := range valuesArray { // Prepare gis metrics list (for signal and distance type of measurements // Each result from redis will create 2 entries in influxdb gisMetricList := make([]Metric, 2*len(valuesArray)) index := 0 for _, values := range valuesArray { // Format network metric nm, err := ms.formatCachedUeMetric(values) nm, err := ms.formatCachedGisMetric(values) if err != nil { continue } // Add metric to list metricSignal := &metricSignalList[index] metricSignal.Name = UeMetNameInflux metricSignal.Tags = map[string]string{UeMetSrc: nm.Src, UeMetSrcType: nm.SrcType, UeMetDest: nm.Dest, UeMetDestType: nm.DestType, UeMetMeasType: UeMetMeasTypeSignal} metricSignal := &gisMetricList[index] index++ metricSignal.Name = GisMetNameInflux metricSignal.Tags = map[string]string{GisMetSrc: nm.Src, GisMetSrcType: nm.SrcType, GisMetDest: nm.Dest, GisMetDestType: nm.DestType, GisMetMeasType: GisMetMeasTypeSignal} metricSignal.Fields = map[string]interface{}{ UeMetRssi: nm.Rssi, UeMetRsrp: nm.Rsrp, UeMetRsrq: nm.Rsrq, } metricDistance := &metricDistanceList[index] metricDistance.Name = UeMetNameInflux metricDistance.Tags = map[string]string{UeMetSrc: nm.Src, UeMetSrcType: nm.SrcType, UeMetDest: nm.Dest, UeMetDestType: nm.DestType, UeMetMeasType: UeMetMeasTypeDistance} GisMetRssi: nm.Rssi, GisMetRsrp: nm.Rsrp, GisMetRsrq: nm.Rsrq, } metricDistance := &gisMetricList[index] index++ metricDistance.Name = GisMetNameInflux metricDistance.Tags = map[string]string{GisMetSrc: nm.Src, GisMetSrcType: nm.SrcType, GisMetDest: nm.Dest, GisMetDestType: nm.DestType, GisMetMeasType: GisMetMeasTypeDistance} metricDistance.Fields = map[string]interface{}{ UeMetDistance: nm.Distance, GisMetDistance: nm.Distance, } } // Store metrics in influx err = ms.SetInfluxMetric(metricSignalList) if err != nil { log.Error("Fail to write influx metrics with error: ", err.Error()) } // Store metrics in influx err = ms.SetInfluxMetric(metricDistanceList) err = ms.SetInfluxMetric(gisMetricList) if err != nil { log.Error("Fail to write influx metrics with error: ", err.Error()) } // logTimeLapse("Write to Influx") } /* // CreateInfluxDb - func (gc *GisCache) CreateInfluxDb() error { if gc.influxName != "" { // Create new DB if necessary if gc.influxClient != nil { q := influx.NewQuery("CREATE DATABASE "+gc.influxName, "", "") _, err := (*gc.influxClient).Query(q) if err != nil { log.Error("Query failed with error: ", err.Error()) return err } } } else { log.Error("Nil influxDbName") } log.Info("Influx database ", gc.influxName, " created") return nil } */ // FlushInfluxDb - //func (gc *GisCache) FlushInfluxDb() { // // Flush Influx DB // if gc.influxClient != nil { // q := influx.NewQuery("DROP SERIES FROM /.*/", gc.influxName, "") // response, err := (*gc.influxClient).Query(q) // if err != nil { // log.Error("Query failed with error: ", err.Error()) // } // log.Info(response.Results) // } //}
go-packages/meep-metrics/metric-store.go +2 −2 File changed.Contains only whitespace changes. Show changes