Newer
Older
Kevin Di Lallo
committed
/*
* Copyright (c) 2019 InterDigital Communications, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package metricstore
import (
"encoding/json"
"errors"
Kevin Di Lallo
committed
log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
)
const NetMetName = "network"
const NetMetSrc = "src"
const NetMetDst = "dest"
const NetMetTime = "time"
const NetMetLatency = "lat"
Kevin Di Lallo
committed
const NetMetULThroughput = "ul"
const NetMetDLThroughput = "dl"
const NetMetULPktLoss = "ulos"
const NetMetDLPktLoss = "dlos"
const NetMetKey = "key"
Src string
Dst string
Kevin Di Lallo
committed
Time interface{}
Lat int32
UlTput float64
DlTput float64
UlLoss float64
DlLoss float64
Kevin Di Lallo
committed
func (ms *MetricStore) SetCachedNetworkMetric(metric NetworkMetric) (err error) {
Kevin Di Lallo
committed
// Set ingress stats
tagStr := metric.Src + ":" + metric.Dst
Kevin Di Lallo
committed
fields := map[string]interface{}{NetMetULThroughput: metric.UlTput, NetMetULPktLoss: metric.UlLoss}
err = ms.SetRedisMetric(NetMetName, tagStr, fields)
if err != nil {
log.Error("Failed to set ingress stats with error: ", err.Error())
return
}
// Set egress stats
tagStr = metric.Dst + ":" + metric.Src
Kevin Di Lallo
committed
fields = map[string]interface{}{NetMetLatency: metric.Lat, NetMetDLThroughput: metric.UlTput, NetMetDLPktLoss: metric.UlLoss}
err = ms.SetRedisMetric(NetMetName, tagStr, fields)
if err != nil {
log.Error("Failed to set ingress stats with error: ", err.Error())
return
}
return nil
Kevin Di Lallo
committed
}
func (ms *MetricStore) GetCachedNetworkMetric(src string, dst string) (metric NetworkMetric, err error) {
Kevin Di Lallo
committed
// Make sure we have set a store
if ms.name == "" {
err = errors.New("Store name not specified")
return
}
tagStr := src + ":" + dst
Kevin Di Lallo
committed
fields := []string{NetMetLatency, NetMetULThroughput, NetMetDLThroughput, NetMetULPktLoss, NetMetDLPktLoss}
Kevin Di Lallo
committed
var valuesArray []map[string]interface{}
valuesArray, err = ms.GetRedisMetric(NetMetName, tagStr, fields)
Kevin Di Lallo
committed
if err != nil {
log.Error("Failed to retrieve metrics with error: ", err.Error())
return
}
if len(valuesArray) != 1 {
err = errors.New("Metric list length != 1")
// Return formatted metric
return ms.formatCachedNetworkMetric(valuesArray[0])
func (ms *MetricStore) SetNetworkMetric(nm NetworkMetric) error {
metricList := make([]Metric, 1)
metric := &metricList[0]
metric.Name = NetMetName
metric.Tags = map[string]string{NetMetSrc: nm.Src, NetMetDst: nm.Dst}
metric.Fields = map[string]interface{}{
NetMetLatency: nm.Lat,
NetMetULThroughput: nm.UlTput,
NetMetDLThroughput: nm.DlTput,
NetMetULPktLoss: nm.UlLoss,
NetMetDLPktLoss: nm.DlLoss,
Kevin Di Lallo
committed
}
return ms.SetInfluxMetric(metricList)
Kevin Di Lallo
committed
}
func (ms *MetricStore) GetNetworkMetric(src string, dst string, duration string, count int) (metrics []NetworkMetric, err error) {
Kevin Di Lallo
committed
// Make sure we have set a store
if ms.name == "" {
err = errors.New("Store name not specified")
return
}
tags := map[string]string{NetMetSrc: src, NetMetDst: dst}
Kevin Di Lallo
committed
fields := []string{NetMetLatency, NetMetULThroughput, NetMetDLThroughput, NetMetULPktLoss, NetMetDLPktLoss}
Kevin Di Lallo
committed
var valuesArray []map[string]interface{}
valuesArray, err = ms.GetInfluxMetric(NetMetName, tags, fields, duration, count)
Kevin Di Lallo
committed
if err != nil {
log.Error("Failed to retrieve metrics with error: ", err.Error())
return
}
// Format network metrics
metrics = make([]NetworkMetric, len(valuesArray))
for index, values := range valuesArray {
metrics[index].Src = src
metrics[index].Dst = dst
metrics[index].Time = values[NetMetTime]
metrics[index].Lat = JsonNumToInt32(values[NetMetLatency].(json.Number))
Kevin Di Lallo
committed
metrics[index].UlTput = JsonNumToFloat64(values[NetMetULThroughput].(json.Number))
metrics[index].DlTput = JsonNumToFloat64(values[NetMetDLThroughput].(json.Number))
metrics[index].UlLoss = JsonNumToFloat64(values[NetMetULPktLoss].(json.Number))
metrics[index].DlLoss = JsonNumToFloat64(values[NetMetDLPktLoss].(json.Number))
func (ms *MetricStore) formatCachedNetworkMetric(values map[string]interface{}) (metric NetworkMetric, err error) {
Kevin Di Lallo
committed
var ok bool
var val interface{}
// Process field values
Kevin Di Lallo
committed
if val, ok = values[NetMetLatency]; !ok {
val = ""
}
metric.Lat = StrToInt32(val.(string))
if val, ok = values[NetMetULThroughput]; !ok {
val = ""
}
metric.UlTput = StrToFloat64(val.(string))
if val, ok = values[NetMetDLThroughput]; !ok {
val = ""
}
metric.DlTput = StrToFloat64(val.(string))
if val, ok = values[NetMetULPktLoss]; !ok {
val = ""
}
metric.UlLoss = StrToFloat64(val.(string))
if val, ok = values[NetMetDLPktLoss]; !ok {
val = ""
}
metric.DlLoss = StrToFloat64(val.(string))
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
// Retrieve Src & Dst from key
if key, ok := values[NetMetKey]; ok {
subKey := strings.Split(key.(string), ":")
metric.Src = subKey[2]
metric.Dst = subKey[3]
} else {
return metric, errors.New("")
}
return metric, nil
}
func (ms *MetricStore) takeNetworkMetricSnapshot() {
// start = time.Now()
// Get all cached network metrics
fields := []string{NetMetLatency, NetMetULThroughput, NetMetDLThroughput, NetMetULPktLoss, NetMetDLPktLoss}
valuesArray, err := ms.GetRedisMetric(NetMetName, "*", fields)
if err != nil {
log.Error("Failed to retrieve metrics with error: ", err.Error())
return
}
// logTimeLapse("GetRedisMetric wildcard")
// Prepare network metrics list
metricList := make([]Metric, len(valuesArray))
for index, values := range valuesArray {
// Format network metric
nm, err := ms.formatCachedNetworkMetric(values)
if err != nil {
continue
}
// Add metric to list
metric := &metricList[index]
metric.Name = NetMetName
metric.Tags = map[string]string{NetMetSrc: nm.Src, NetMetDst: nm.Dst}
metric.Fields = map[string]interface{}{
NetMetLatency: nm.Lat,
NetMetULThroughput: nm.UlTput,
NetMetDLThroughput: nm.DlTput,
NetMetULPktLoss: nm.UlLoss,
NetMetDLPktLoss: nm.DlLoss,
}
}
// Store metrics in influx
err = ms.SetInfluxMetric(metricList)
if err != nil {
log.Error("Fail to write influx metrics with error: ", err.Error())
}
// logTimeLapse("Write to Influx")
}