"go-packages/meep-rnis-client/model_nrcgi.go" did not exist on "a4f7e957ceb70f7a104f0259421358888e2ca280"
Newer
Older
/*
* Copyright (c) 2019 InterDigital Communications, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package metricstore
import (
"encoding/json"
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
"time"
log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
_ "github.com/influxdata/influxdb1-client" // this is important because of the bug in go mod
influxclient "github.com/influxdata/influxdb1-client/v2"
)
const dbMaxRetryCount = 2
const (
metricNet = "netmet"
metricEvent = "events"
)
// MetricStore - Implements a metric store
type MetricStore struct {
name string
addr string
connected bool
client *influxclient.Client
}
// NewMetricStore - Creates and initialize a Metric Store instance
func NewMetricStore(name string, addr string) (ms *MetricStore, err error) {
ms = new(MetricStore)
// Connect to Influx DB
for retry := 0; !ms.connected && retry <= dbMaxRetryCount; retry++ {
err = ms.connectDB(addr)
if err != nil {
log.Warn("Failed to connect to InfluxDB. Retrying... Error: ", err)
}
}
if err != nil {
return nil, err
}
// Set store to use
err = ms.SetStore(name)
log.Error("Failed to set store: ", err.Error())
return nil, err
}
log.Info("Successfully connected to Influx DB")
return ms, nil
}
func (ms *MetricStore) connectDB(addr string) error {
if addr == "" {
ms.addr = "http://influxdb:8086"
} else {
ms.addr = addr
}
log.Debug("InfluxDB Connector connecting to ", ms.addr)
client, err := influxclient.NewHTTPClient(influxclient.HTTPConfig{Addr: ms.addr})
if err != nil {
log.Error("InfluxDB Connector unable to connect ", ms.addr)
return err
}
defer client.Close()
_, version, err := client.Ping(1000 * time.Millisecond)
if err != nil {
log.Error("InfluxDB Connector unable to connect ", ms.addr)
return err
}
ms.client = &client
ms.connected = true
log.Info("InfluxDB Connector connected to ", ms.addr, " version: ", version)
return nil
}
// SetStore -
func (ms *MetricStore) SetStore(name string) error {
// Set current store. Create new DB if necessary.
if name != "" {
q := influxclient.NewQuery("CREATE DATABASE "+name, "", "")
_, err := (*ms.client).Query(q)
if err != nil {
log.Error("Query failed with error: ", err.Error())
return err
}
ms.name = name
}
return nil
}
// Flush
func (ms *MetricStore) Flush() {
// Make sure we have set a store
if ms.name == "" {
return
}
// Create Store DB if it does not exist
q := influxclient.NewQuery("DROP SERIES FROM /.*/", ms.name, "")
response, err := (*ms.client).Query(q)
if err != nil {
log.Error("Query failed with error: ", err.Error())
}
log.Info(response.Results)
}
// SetMetric - Generic metric setter
func (ms *MetricStore) SetMetric(metric string, tags map[string]string, fields map[string]interface{}) error {
// Make sure we have set a store
if ms.name == "" {
err := errors.New("Store name not specified")
return err
}
// Create a new point batch
bp, _ := influxclient.NewBatchPoints(influxclient.BatchPointsConfig{
Database: ms.name,
Precision: "us",
})
// Create a point and add to batch
pt, err := influxclient.NewPoint(metric, tags, fields)
if err != nil {
log.Error("Failed to create point with error: ", err)
return err
}
bp.AddPoint(pt)
// Write the batch
err = (*ms.client).Write(bp)
if err != nil {
log.Error("Failed to write point with error: ", err)
return err
}
return nil
}
// GetMetric - Generic metric getter
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
func (ms *MetricStore) GetMetric(metric string, tags map[string]string, fields []string, count int) (values []map[string]interface{}, err error) {
// Make sure we have set a store
if ms.name == "" {
err := errors.New("Store name not specified")
return values, err
}
// Create query
fieldStr := ""
for _, field := range fields {
if fieldStr == "" {
fieldStr = field
} else {
fieldStr += "," + field
}
}
if fieldStr == "" {
fieldStr = "*"
}
tagStr := ""
for k, v := range tags {
if tagStr == "" {
tagStr = " WHERE " + k + "='" + v + "'"
} else {
tagStr += " AND " + k + "='" + v + "'"
}
}
query := "SELECT " + fieldStr + " FROM " + metric + " " + tagStr + " ORDER BY desc LIMIT " + strconv.Itoa(count)
log.Error("QUERY: ", query)
// Query store for metric
q := influxclient.NewQuery(query, ms.name, "")
response, err := (*ms.client).Query(q)
if err != nil {
log.Error("Query failed with error: ", err.Error())
return values, err
}
// Process response
if len(response.Results) <= 0 || len(response.Results[0].Series) <= 0 {
err = errors.New("Query returned no results")
log.Error("Query failed with error: ", err.Error())
return values, err
// Read results
row := response.Results[0].Series[0]
for _, qValues := range row.Values {
rValues := make(map[string]interface{})
for index, qVal := range qValues {
rValues[row.Columns[index]] = qVal
}
values = append(values, rValues)
}
return values, nil
}
// SetNetMetric
func (ms *MetricStore) SetNetMetric(src string, dest string, lat int32, tput int32, loss int64) error {
tags := map[string]string{
"src": src,
"dest": dest,
}
fields := map[string]interface{}{
"lat": lat,
"tput": tput,
"loss": loss,
}
return ms.SetMetric(metricNet, tags, fields)
}
// GetNetMetric
func (ms *MetricStore) GetLastNetMetric(src string, dest string) (lat int32, tput int32, loss int64, err error) {
// Make sure we have set a store
if ms.name == "" {
err := errors.New("Store name not specified")
return lat, tput, loss, err
}
// Get latest Net metric
tags := map[string]string{
"src": src,
"dest": dest,
}
fields := []string{"lat", "tput", "loss"}
valuesArray, err := ms.GetMetric(metricNet, tags, fields, 1)
if err != nil {
log.Error("Failed to retrieve metrics with error: ", err.Error())
return lat, tput, loss, err
}
// Take first & only values
values := valuesArray[0]
lat = JsonNumToInt32(values["lat"].(json.Number))
tput = JsonNumToInt32(values["tput"].(json.Number))
loss = JsonNumToInt64(values["loss"].(json.Number))
return lat, tput, loss, nil
}
// SetEventMetric
func (ms *MetricStore) SetEventMetric(typ string, target string, desc string) error {
tags := map[string]string{
"type": typ,
"target": target,
}
fields := map[string]interface{}{
"description": desc,
}
return ms.SetMetric(metricEvent, tags, fields)
}