Commit 3d27b084 authored by Kevin Di Lallo's avatar Kevin Di Lallo
Browse files

added method to set store + improved implementation & UT

parent c2dc3943
Loading
Loading
Loading
Loading
+36 −0
Original line number Diff line number Diff line
/*
 * Copyright (c) 2019  InterDigital Communications, Inc
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package metricstore

import (
	"encoding/json"
	"strconv"
)

func JsonNumToInt32(num json.Number) (val int32) {
	if intVal, err := strconv.Atoi(num.String()); err == nil {
		val = int32(intVal)
	}
	return val
}

func JsonNumToInt64(num json.Number) (val int64) {
	if intVal, err := num.Int64(); err == nil {
		val = intVal
	}
	return val
}
+100 −20
Original line number Diff line number Diff line
@@ -17,7 +17,9 @@
package metricstore

import (
	"encoding/json"
	"errors"
	"strconv"
	"time"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
@@ -43,7 +45,6 @@ type MetricStore struct {
// NewMetricStore - Creates and initialize a Metric Store instance
func NewMetricStore(name string, addr string) (ms *MetricStore, err error) {
	ms = new(MetricStore)
	ms.name = name

	// Connect to Influx DB
	for retry := 0; !ms.connected && retry <= dbMaxRetryCount; retry++ {
@@ -56,13 +57,12 @@ func NewMetricStore(name string, addr string) (ms *MetricStore, err error) {
		return nil, err
	}

	// Create Store DB if it does not exist
	q := influxclient.NewQuery("CREATE DATABASE "+name, "", "")
	response, err := (*ms.client).Query(q)
	// Set store to use
	err = ms.SetStore(name)
	if err != nil {
		log.Error("Query failed with error: ", err.Error())
		log.Error("Failed to set store: ", err.Error())
		return nil, err
	}
	log.Info(response.Results)

	log.Info("Successfully connected to Influx DB")
	return ms, nil
@@ -95,8 +95,27 @@ func (ms *MetricStore) connectDB(addr string) error {
	return nil
}

// SetStore -
func (ms *MetricStore) SetStore(name string) error {
	// Set current store. Create new DB if necessary.
	if name != "" {
		q := influxclient.NewQuery("CREATE DATABASE "+name, "", "")
		_, err := (*ms.client).Query(q)
		if err != nil {
			log.Error("Query failed with error: ", err.Error())
			return err
		}
		ms.name = name
	}
	return nil
}

// Flush
func (ms *MetricStore) Flush() {
	// Make sure we have set a store
	if ms.name == "" {
		return
	}

	// Create Store DB if it does not exist
	q := influxclient.NewQuery("DROP SERIES FROM /.*/", ms.name, "")
@@ -109,6 +128,12 @@ func (ms *MetricStore) Flush() {

// SetMetric - Generic metric setter
func (ms *MetricStore) SetMetric(metric string, tags map[string]string, fields map[string]interface{}) error {
	// Make sure we have set a store
	if ms.name == "" {
		err := errors.New("Store name not specified")
		return err
	}

	// Create a new point batch
	bp, _ := influxclient.NewBatchPoints(influxclient.BatchPointsConfig{
		Database:  ms.name,
@@ -133,13 +158,62 @@ func (ms *MetricStore) SetMetric(metric string, tags map[string]string, fields m
}

// GetMetric - Generic metric getter
func (ms *MetricStore) GetMetric(metric string) {
	q := influxclient.NewQuery("SELECT * FROM "+metric, ms.name, "")
func (ms *MetricStore) GetMetric(metric string, tags map[string]string, fields []string, count int) (values []map[string]interface{}, err error) {
	// Make sure we have set a store
	if ms.name == "" {
		err := errors.New("Store name not specified")
		return values, err
	}

	// Create query
	fieldStr := ""
	for _, field := range fields {
		if fieldStr == "" {
			fieldStr = field
		} else {
			fieldStr += "," + field
		}
	}
	if fieldStr == "" {
		fieldStr = "*"
	}
	tagStr := ""
	for k, v := range tags {
		if tagStr == "" {
			tagStr = " WHERE " + k + "='" + v + "'"
		} else {
			tagStr += " AND " + k + "='" + v + "'"
		}
	}
	query := "SELECT " + fieldStr + " FROM " + metric + " " + tagStr + " ORDER BY desc LIMIT " + strconv.Itoa(count)
	log.Error("QUERY: ", query)

	// Query store for metric
	q := influxclient.NewQuery(query, ms.name, "")
	response, err := (*ms.client).Query(q)
	if err != nil {
		log.Error("Query failed with error: ", err.Error())
		return values, err
	}

	// Process response
	if len(response.Results) <= 0 || len(response.Results[0].Series) <= 0 {
		err = errors.New("Query returned no results")
		log.Error("Query failed with error: ", err.Error())
		return values, err
	}

	// Read results
	row := response.Results[0].Series[0]
	for _, qValues := range row.Values {
		rValues := make(map[string]interface{})
		for index, qVal := range qValues {
			rValues[row.Columns[index]] = qVal
		}
		values = append(values, rValues)
	}
	log.Error(response.Results)

	return values, nil
}

// SetNetMetric
@@ -158,23 +232,29 @@ func (ms *MetricStore) SetNetMetric(src string, dest string, lat int32, tput int

// GetNetMetric
func (ms *MetricStore) GetLastNetMetric(src string, dest string) (lat int32, tput int32, loss int64, err error) {
	query := "SELECT lat,tput,loss FROM " + metricNet + " WHERE src='" + src + "' AND dest='" + dest + "' ORDER BY desc LIMIT 1"
	q := influxclient.NewQuery(query, ms.name, "")
	response, err := (*ms.client).Query(q)
	if err != nil {
		log.Error("Query failed with error: ", err.Error())
	// Make sure we have set a store
	if ms.name == "" {
		err := errors.New("Store name not specified")
		return lat, tput, loss, err
	}
	log.Error(response.Results)

	if len(response.Results[0].Series) == 0 {
		err = errors.New("Query returned no results")
		log.Error("Query failed with error: ", err.Error())
	// Get latest Net metric
	tags := map[string]string{
		"src":  src,
		"dest": dest,
	}
	fields := []string{"lat", "tput", "loss"}
	valuesArray, err := ms.GetMetric(metricNet, tags, fields, 1)
	if err != nil {
		log.Error("Failed to retrieve metrics with error: ", err.Error())
		return lat, tput, loss, err
	}

	// Read results
	// response.Results[]
	// Take first & only values
	values := valuesArray[0]
	lat = JsonNumToInt32(values["lat"].(json.Number))
	tput = JsonNumToInt32(values["tput"].(json.Number))
	loss = JsonNumToInt64(values["loss"].(json.Number))

	return lat, tput, loss, nil
}
+114 −14
Original line number Diff line number Diff line
@@ -23,7 +23,8 @@ import (
	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
)

const metricStoreName string = "metricStore"
const metricStore1Name string = "metricStore1"
const metricStore2Name string = "metricStore2"
const metricStoreAddr string = "http://localhost:30386"

func TestNewMetricStore(t *testing.T) {
@@ -32,16 +33,39 @@ func TestNewMetricStore(t *testing.T) {

	// Keep this one first...
	fmt.Println("Invalid Metric Store address")
	_, err := NewMetricStore(metricStoreName, "ExpectedFailure-InvalidStoreAddr")
	ms, err := NewMetricStore("", "ExpectedFailure-InvalidStoreAddr")
	if err == nil {
		t.Errorf("Should report error on invalid store addr")
	}
	if ms != nil {
		t.Errorf("Should have a nil metric store")
	}

	fmt.Println("Create valid Metric Store")
	_, err = NewMetricStore(metricStoreName, metricStoreAddr)
	ms, err = NewMetricStore("", metricStoreAddr)
	if err != nil {
		t.Errorf("Unable to create Metric Store")
	}
	fmt.Println("Invoke API before setting store")
	_, _, _, err = ms.GetLastNetMetric("node1", "node2")
	if err == nil {
		t.Errorf("API call should fail if no store is set")
	}
	err = ms.SetNetMetric("node1", "node2", 1, 2, 3)
	if err == nil {
		t.Errorf("API call should fail if no store is set")
	}

	fmt.Println("Set store")
	err = ms.SetStore(metricStore1Name)
	if err != nil {
		t.Errorf("Unable to set Store")
	}
	fmt.Println("Set store2")
	err = ms.SetStore(metricStore2Name)
	if err != nil {
		t.Errorf("Unable to set Store2")
	}

	// t.Errorf("DONE")
}
@@ -51,7 +75,7 @@ func TestGetSetMetric(t *testing.T) {
	log.MeepTextLogInit(t.Name())

	fmt.Println("Create valid Metric Store")
	ms, err := NewMetricStore(metricStoreName, metricStoreAddr)
	ms, err := NewMetricStore(metricStore1Name, metricStoreAddr)
	if err != nil {
		t.Errorf("Unable to create Metric Store")
	}
@@ -60,27 +84,103 @@ func TestGetSetMetric(t *testing.T) {
	ms.Flush()

	fmt.Println("Get empty metric")
	_, _, _, err = ms.GetLastNetMetric("node1", "node2")
	if err == nil {
	lat, tput, loss, err := ms.GetLastNetMetric("node1", "node2")
	if err == nil || lat != 0 || tput != 0 || loss != 0 {
		t.Errorf("Net metric should not exist")
	}

	fmt.Println("Set net metric")
	err = ms.SetNetMetric("node1", "node2", 1, 2, 3)
	fmt.Println("Set net metrics")
	err = ms.SetNetMetric("node1", "node2", 0, 1, 2.0)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}

	fmt.Println("Set event metric")
	err = ms.SetEventMetric("MOBILITY", "node1", "UE Mobility event")
	err = ms.SetNetMetric("node1", "node3", 1, 2, 3.0)
	if err != nil {
		t.Errorf("Unable to set event metric")
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetNetMetric("node2", "node1", 2, 3, 4.0)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetNetMetric("node2", "node3", 3, 4, 5.0)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetNetMetric("node3", "node1", 4, 5, 6.0)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetNetMetric("node3", "node2", 5, 6, 7.0)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetNetMetric("node1", "node2", 6, 7, 8.0)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetNetMetric("node1", "node3", 7, 8, 9.0)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetNetMetric("node2", "node1", 8, 9, 0.0)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetNetMetric("node2", "node3", 9, 0, 1.0)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetNetMetric("node3", "node1", 0, 1, 2.0)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetNetMetric("node3", "node2", 1, 2, 3.0)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}

	fmt.Println("Get metric")
	_, _, _, err = ms.GetLastNetMetric("node1", "node2")
	fmt.Println("Get net metrics")
	lat, tput, loss, err = ms.GetLastNetMetric("node1", "node2")
	if err != nil {
		t.Errorf("Net metric should exist")
	} else if lat != 6 || tput != 7 || loss != 8.0 {
		t.Errorf("Invalid metric values")
	}
	lat, tput, loss, err = ms.GetLastNetMetric("node1", "node3")
	if err != nil {
		t.Errorf("Net metric should exist")
	} else if lat != 7 || tput != 8 || loss != 9.0 {
		t.Errorf("Invalid metric values")
	}
	lat, tput, loss, err = ms.GetLastNetMetric("node2", "node1")
	if err != nil {
		t.Errorf("Net metric should exist")
	} else if lat != 8 || tput != 9 || loss != 0 {
		t.Errorf("Invalid metric values")
	}
	lat, tput, loss, err = ms.GetLastNetMetric("node2", "node3")
	if err != nil {
		t.Errorf("Net metric should exist")
	} else if lat != 9 || tput != 0 || loss != 1.0 {
		t.Errorf("Invalid metric values")
	}
	lat, tput, loss, err = ms.GetLastNetMetric("node3", "node1")
	if err != nil {
		t.Errorf("Net metric should exist")
	} else if lat != 0 || tput != 1 || loss != 2.0 {
		t.Errorf("Invalid metric values")
	}
	lat, tput, loss, err = ms.GetLastNetMetric("node3", "node2")
	if err != nil {
		t.Errorf("Net metric should exist")
	} else if lat != 1 || tput != 2 || loss != 3.0 {
		t.Errorf("Invalid metric values")
	}

	fmt.Println("Set event metric")
	err = ms.SetEventMetric("MOBILITY", "node1", "UE Mobility event")
	if err != nil {
		t.Errorf("Unable to set event metric")
	}

	// t.Errorf("DONE")