Newer
Older
Kevin Di Lallo
committed
/*
* Copyright (c) 2019 InterDigital Communications, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package metricstore
import (
"encoding/json"
"errors"
log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
)
const NetMetName = "network"
const NetMetSrc = "src"
const NetMetDst = "dest"
const NetMetTime = "time"
const NetMetLatency = "lat"
Kevin Di Lallo
committed
const NetMetULThroughput = "ul"
const NetMetDLThroughput = "dl"
const NetMetULPktLoss = "ulos"
const NetMetDLPktLoss = "dlos"
Src string
Dst string
Kevin Di Lallo
committed
Time interface{}
Lat int32
UlTput float64
DlTput float64
UlLoss float64
DlLoss float64
Kevin Di Lallo
committed
func (ms *MetricStore) SetCachedNetworkMetric(metric NetworkMetric) (err error) {
Kevin Di Lallo
committed
// Set ingress stats
tagStr := metric.Src + ":" + metric.Dst
Kevin Di Lallo
committed
fields := map[string]interface{}{
NetMetSrc: metric.Src,
NetMetDst: metric.Dst,
NetMetULThroughput: metric.UlTput,
NetMetULPktLoss: metric.UlLoss,
}
Kevin Di Lallo
committed
err = ms.SetRedisMetric(NetMetName, tagStr, fields)
if err != nil {
log.Error("Failed to set ingress stats with error: ", err.Error())
return
}
// Set egress stats
tagStr = metric.Dst + ":" + metric.Src
Kevin Di Lallo
committed
fields = map[string]interface{}{
NetMetSrc: metric.Dst,
NetMetDst: metric.Src,
NetMetLatency: metric.Lat,
NetMetDLThroughput: metric.UlTput,
NetMetDLPktLoss: metric.UlLoss,
}
Kevin Di Lallo
committed
err = ms.SetRedisMetric(NetMetName, tagStr, fields)
if err != nil {
log.Error("Failed to set ingress stats with error: ", err.Error())
return
}
return nil
Kevin Di Lallo
committed
}
func (ms *MetricStore) GetCachedNetworkMetric(src string, dst string) (metric NetworkMetric, err error) {
Kevin Di Lallo
committed
// Make sure we have set a store
if ms.name == "" {
err = errors.New("Store name not specified")
return
}
tagStr := src + ":" + dst
Kevin Di Lallo
committed
var valuesArray []map[string]interface{}
Kevin Di Lallo
committed
valuesArray, err = ms.GetRedisMetric(NetMetName, tagStr)
Kevin Di Lallo
committed
if err != nil {
log.Error("Failed to retrieve metrics with error: ", err.Error())
return
}
if len(valuesArray) != 1 {
err = errors.New("Metric list length != 1")
// Return formatted metric
return ms.formatCachedNetworkMetric(valuesArray[0])
func (ms *MetricStore) SetNetworkMetric(nm NetworkMetric) error {
metricList := make([]Metric, 1)
metric := &metricList[0]
metric.Name = NetMetName
metric.Tags = map[string]string{NetMetSrc: nm.Src, NetMetDst: nm.Dst}
metric.Fields = map[string]interface{}{
NetMetLatency: nm.Lat,
NetMetULThroughput: nm.UlTput,
NetMetDLThroughput: nm.DlTput,
NetMetULPktLoss: nm.UlLoss,
NetMetDLPktLoss: nm.DlLoss,
Kevin Di Lallo
committed
}
return ms.SetInfluxMetric(metricList)
Kevin Di Lallo
committed
}
func (ms *MetricStore) GetNetworkMetric(src string, dst string, duration string, count int) (metrics []NetworkMetric, err error) {
Kevin Di Lallo
committed
// Make sure we have set a store
if ms.name == "" {
err = errors.New("Store name not specified")
return
}
tags := map[string]string{NetMetSrc: src, NetMetDst: dst}
Kevin Di Lallo
committed
fields := []string{NetMetLatency, NetMetULThroughput, NetMetDLThroughput, NetMetULPktLoss, NetMetDLPktLoss}
Kevin Di Lallo
committed
var valuesArray []map[string]interface{}
valuesArray, err = ms.GetInfluxMetric(NetMetName, tags, fields, duration, count)
Kevin Di Lallo
committed
if err != nil {
log.Error("Failed to retrieve metrics with error: ", err.Error())
return
}
// Format network metrics
metrics = make([]NetworkMetric, len(valuesArray))
for index, values := range valuesArray {
metrics[index].Src = src
metrics[index].Dst = dst
metrics[index].Time = values[NetMetTime]
metrics[index].Lat = JsonNumToInt32(values[NetMetLatency].(json.Number))
Kevin Di Lallo
committed
metrics[index].UlTput = JsonNumToFloat64(values[NetMetULThroughput].(json.Number))
metrics[index].DlTput = JsonNumToFloat64(values[NetMetDLThroughput].(json.Number))
metrics[index].UlLoss = JsonNumToFloat64(values[NetMetULPktLoss].(json.Number))
metrics[index].DlLoss = JsonNumToFloat64(values[NetMetDLPktLoss].(json.Number))
func (ms *MetricStore) formatCachedNetworkMetric(values map[string]interface{}) (metric NetworkMetric, err error) {
Kevin Di Lallo
committed
var ok bool
var val interface{}
// Process field values
Kevin Di Lallo
committed
if val, ok = values[NetMetSrc]; !ok {
val = ""
}
metric.Src = val.(string)
if val, ok = values[NetMetDst]; !ok {
val = ""
}
metric.Dst = val.(string)
Kevin Di Lallo
committed
if val, ok = values[NetMetLatency]; !ok {
val = ""
}
metric.Lat = StrToInt32(val.(string))
if val, ok = values[NetMetULThroughput]; !ok {
val = ""
}
metric.UlTput = StrToFloat64(val.(string))
if val, ok = values[NetMetDLThroughput]; !ok {
val = ""
}
metric.DlTput = StrToFloat64(val.(string))
if val, ok = values[NetMetULPktLoss]; !ok {
val = ""
}
metric.UlLoss = StrToFloat64(val.(string))
if val, ok = values[NetMetDLPktLoss]; !ok {
val = ""
}
metric.DlLoss = StrToFloat64(val.(string))
return metric, nil
}
func (ms *MetricStore) takeNetworkMetricSnapshot() {
// start = time.Now()
// Get all cached network metrics
Kevin Di Lallo
committed
valuesArray, err := ms.GetRedisMetric(NetMetName, "*")
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
if err != nil {
log.Error("Failed to retrieve metrics with error: ", err.Error())
return
}
// logTimeLapse("GetRedisMetric wildcard")
// Prepare network metrics list
metricList := make([]Metric, len(valuesArray))
for index, values := range valuesArray {
// Format network metric
nm, err := ms.formatCachedNetworkMetric(values)
if err != nil {
continue
}
// Add metric to list
metric := &metricList[index]
metric.Name = NetMetName
metric.Tags = map[string]string{NetMetSrc: nm.Src, NetMetDst: nm.Dst}
metric.Fields = map[string]interface{}{
NetMetLatency: nm.Lat,
NetMetULThroughput: nm.UlTput,
NetMetDLThroughput: nm.DlTput,
NetMetULPktLoss: nm.UlLoss,
NetMetDLPktLoss: nm.DlLoss,
}
}
// Store metrics in influx
err = ms.SetInfluxMetric(metricList)
if err != nil {
log.Error("Fail to write influx metrics with error: ", err.Error())
}
// logTimeLapse("Write to Influx")
}