Commit 473c1b3f authored by Kevin Di Lallo's avatar Kevin Di Lallo
Browse files

added master node affinity to influxdb + start & stop time capabilities in metric store

parent 0ef4f944
Loading
Loading
Loading
Loading
+7 −1
Original line number Diff line number Diff line
@@ -132,7 +132,13 @@ nodeSelector: {}
## Affinity for pod assignment
## Ref: https://kubernetes.io/docs/concepts/configuration/assign-pod-node/#affinity-and-anti-affinity
##
affinity: {}
affinity:
  nodeAffinity:
    requiredDuringSchedulingIgnoredDuringExecution:
      nodeSelectorTerms:
      - matchExpressions:
        - key: node-role.kubernetes.io/master
          operator: Exists

## Tolerations for pod assignment
## Ref: https://kubernetes.io/docs/concepts/configuration/taint-and-toleration/
+63 −0
Original line number Diff line number Diff line
/*
 * Copyright (c) 2019  InterDigital Communications, Inc
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package metricstore

import (
	"errors"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
)

const metricEvent = "events"

// SetEventMetric
func (ms *MetricStore) SetEventMetric(eventType string, eventStr string) error {
	tags := map[string]string{
		"type": eventType,
	}
	fields := map[string]interface{}{
		"event": eventStr,
	}
	return ms.SetMetric(metricEvent, tags, fields)
}

// GetLastEventMetric
func (ms *MetricStore) GetLastEventMetric(eventType string) (event string, err error) {
	// Make sure we have set a store
	if ms.name == "" {
		err := errors.New("Store name not specified")
		return event, err
	}

	// Get latest Net metric
	tags := map[string]string{
		"type": eventType,
	}
	fields := []string{"event"}
	valuesArray, err := ms.GetMetric(metricEvent, tags, fields, "", "", 1)
	if err != nil {
		log.Error("Failed to retrieve metrics with error: ", err.Error())
		return event, err
	}

	// Take first & only values
	values := valuesArray[0]
	if val, ok := values["event"].(string); ok {
		event = val
	}
	return event, nil
}
+40 −130
Original line number Diff line number Diff line
@@ -17,7 +17,6 @@
package metricstore

import (
	"encoding/json"
	"errors"
	"strconv"
	"time"
@@ -28,12 +27,9 @@ import (
	influxclient "github.com/influxdata/influxdb1-client/v2"
)

// var start time.Time

const dbMaxRetryCount = 2
const (
	metricLatency = "latency"
	metricTraffic = "traffic"
	metricEvent   = "events"
)

// MetricStore - Implements a metric store
type MetricStore struct {
@@ -149,17 +145,22 @@ func (ms *MetricStore) SetMetric(metric string, tags map[string]string, fields m
	}
	bp.AddPoint(pt)

	// logTimeLapse("Created point: ")

	// Write the batch
	err = (*ms.client).Write(bp)
	if err != nil {
		log.Error("Failed to write point with error: ", err)
		return err
	}

	// logTimeLapse("Write complete: ")

	return nil
}

// GetMetric - Generic metric getter
func (ms *MetricStore) GetMetric(metric string, tags map[string]string, fields []string, count int) (values []map[string]interface{}, err error) {
func (ms *MetricStore) GetMetric(metric string, tags map[string]string, fields []string, startTime string, stopTime string, count int) (values []map[string]interface{}, err error) {
	// Make sure we have set a store
	if ms.name == "" {
		err := errors.New("Store name not specified")
@@ -167,6 +168,8 @@ func (ms *MetricStore) GetMetric(metric string, tags map[string]string, fields [
	}

	// Create query

	// Fields
	fieldStr := ""
	for _, field := range fields {
		if fieldStr == "" {
@@ -178,6 +181,8 @@ func (ms *MetricStore) GetMetric(metric string, tags map[string]string, fields [
	if fieldStr == "" {
		fieldStr = "*"
	}

	// Tags
	tagStr := ""
	for k, v := range tags {
		if tagStr == "" {
@@ -186,8 +191,29 @@ func (ms *MetricStore) GetMetric(metric string, tags map[string]string, fields [
			tagStr += " AND " + k + "='" + v + "'"
		}
	}
	query := "SELECT " + fieldStr + " FROM " + metric + " " + tagStr + " ORDER BY desc LIMIT " + strconv.Itoa(count)
	log.Error("QUERY: ", query)
	if startTime != "" {
		if tagStr == "" {
			tagStr = " WHERE time > " + startTime
		} else {
			tagStr += " AND time > " + startTime
		}
	}
	if stopTime != "" {
		if tagStr == "" {
			tagStr = " WHERE time < " + stopTime
		} else {
			tagStr += " AND time < " + stopTime
		}
	}

	// Count
	countStr := ""
	if count != 0 {
		countStr = " LIMIT " + strconv.Itoa(count)
	}

	query := "SELECT " + fieldStr + " FROM " + metric + " " + tagStr + " ORDER BY desc" + countStr
	log.Debug("QUERY: ", query)

	// Query store for metric
	q := influxclient.NewQuery(query, ms.name, "")
@@ -217,124 +243,8 @@ func (ms *MetricStore) GetMetric(metric string, tags map[string]string, fields [
	return values, nil
}

// SetLatencyMetric
func (ms *MetricStore) SetLatencyMetric(src string, dest string, lat int32, mean int32) error {
	tags := map[string]string{
		"src":  src,
		"dest": dest,
	}
	fields := map[string]interface{}{
		"lat":  lat,
		"mean": mean,
	}
	return ms.SetMetric(metricLatency, tags, fields)
}

// GetLastLatencyMetric
func (ms *MetricStore) GetLastLatencyMetric(src string, dest string) (lat int32, mean int32, err error) {
	// Make sure we have set a store
	if ms.name == "" {
		err = errors.New("Store name not specified")
		return
	}

	// Get latest Latency metric
	tags := map[string]string{
		"src":  src,
		"dest": dest,
	}
	fields := []string{"lat", "mean"}

	var valuesArray []map[string]interface{}
	valuesArray, err = ms.GetMetric(metricLatency, tags, fields, 1)
	if err != nil {
		log.Error("Failed to retrieve metrics with error: ", err.Error())
		return
	}

	// Take first & only values
	values := valuesArray[0]
	lat = JsonNumToInt32(values["lat"].(json.Number))
	mean = JsonNumToInt32(values["mean"].(json.Number))
	return
}

// SetTrafficMetric
func (ms *MetricStore) SetTrafficMetric(src string, dest string, tput float64, loss float64) error {
	tags := map[string]string{
		"src":  src,
		"dest": dest,
	}
	fields := map[string]interface{}{
		"tput": tput,
		"loss": loss,
	}
	return ms.SetMetric(metricTraffic, tags, fields)
}

// GetLastTrafficMetric
func (ms *MetricStore) GetLastTrafficMetric(src string, dest string) (tput float64, loss float64, err error) {
	// Make sure we have set a store
	if ms.name == "" {
		err = errors.New("Store name not specified")
		return
	}

	// Get latest Net metric
	tags := map[string]string{
		"src":  src,
		"dest": dest,
	}
	fields := []string{"tput", "loss"}

	var valuesArray []map[string]interface{}
	valuesArray, err = ms.GetMetric(metricTraffic, tags, fields, 1)
	if err != nil {
		log.Error("Failed to retrieve metrics with error: ", err.Error())
		return
	}

	// Take first & only values
	values := valuesArray[0]
	tput = JsonNumToFloat64(values["tput"].(json.Number))
	loss = JsonNumToFloat64(values["loss"].(json.Number))
	return
}

// SetEventMetric
func (ms *MetricStore) SetEventMetric(eventType string, eventStr string) error {
	tags := map[string]string{
		"type": eventType,
	}
	fields := map[string]interface{}{
		"event": eventStr,
	}
	return ms.SetMetric(metricEvent, tags, fields)
}

// GetLastEventMetric
func (ms *MetricStore) GetLastEventMetric(eventType string) (event string, err error) {
	// Make sure we have set a store
	if ms.name == "" {
		err := errors.New("Store name not specified")
		return event, err
	}

	// Get latest Net metric
	tags := map[string]string{
		"type": eventType,
	}
	fields := []string{"event"}
	valuesArray, err := ms.GetMetric(metricEvent, tags, fields, 1)
	if err != nil {
		log.Error("Failed to retrieve metrics with error: ", err.Error())
		return event, err
	}

	// Take first & only values
	values := valuesArray[0]
	if val, ok := values["event"].(string); ok {
		event = val
	}
	return event, nil
}
// func logTimeLapse(logStr string) {
// 	stop := time.Now()
// 	fmt.Println(logStr + strconv.FormatFloat(stop.Sub(start).Seconds()*1000, 'f', -1, 64))
// 	start = stop
// }
+102 −2
Original line number Diff line number Diff line
@@ -74,108 +74,204 @@ func TestGetSetMetric(t *testing.T) {
	fmt.Println("--- ", t.Name())
	log.MeepTextLogInit(t.Name())

	// start = time.Now()

	fmt.Println("Create valid Metric Store")
	ms, err := NewMetricStore(metricStore1Name, metricStoreAddr)
	if err != nil {
		t.Errorf("Unable to create Metric Store")
	}

	// logTimeLapse("Created Metric store: ")

	fmt.Println("Flush store metrics")
	ms.Flush()

	// logTimeLapse("Flush: ")

	fmt.Println("Get empty metric")
	lat, mean, err := ms.GetLastLatencyMetric("node1", "node2")
	if err == nil || lat != 0 || mean != 0 {
		t.Errorf("Net metric should not exist")
	}

	fmt.Println("Set latency metrics")
	// logTimeLapse("Get empty metric: ")

	fmt.Println("Set network metrics")
	err = ms.SetLatencyMetric("node1", "node2", 0, 1)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetTrafficMetric("node1", "node2", 0.1, 1.1)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetLatencyMetric("node1", "node3", 1, 2)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetTrafficMetric("node1", "node3", 1.1, 2.1)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetLatencyMetric("node2", "node1", 2, 3)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetTrafficMetric("node2", "node1", 2.1, 3.1)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetLatencyMetric("node2", "node3", 3, 4)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetTrafficMetric("node2", "node3", 3.1, 4.1)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetLatencyMetric("node3", "node1", 4, 5)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetTrafficMetric("node3", "node1", 4.5, 5.5)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetLatencyMetric("node3", "node2", 5, 6)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetTrafficMetric("node3", "node2", 5.5, 6.5)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetLatencyMetric("node1", "node2", 6, 7)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetTrafficMetric("node1", "node2", 6.1, 7.1)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetLatencyMetric("node1", "node3", 7, 8)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetTrafficMetric("node1", "node3", 7.1, 8.1)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetLatencyMetric("node2", "node1", 8, 9)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetTrafficMetric("node2", "node1", 8.1, 9.1)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetLatencyMetric("node2", "node3", 9, 0)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetTrafficMetric("node2", "node3", 9.1, 0.1)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetLatencyMetric("node3", "node1", 0, 1)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetTrafficMetric("node3", "node1", 0.1, 1.1)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetLatencyMetric("node3", "node2", 1, 2)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}
	err = ms.SetTrafficMetric("node3", "node2", 1.1, 2.1)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}

	// logTimeLapse("Set network metrics: ")

	fmt.Println("Get latency metrics")
	fmt.Println("Get network metrics")
	lat, mean, err = ms.GetLastLatencyMetric("node1", "node2")
	if err != nil {
		t.Errorf("Net metric should exist")
	} else if lat != 6 || mean != 7 {
		t.Errorf("Invalid metric values")
	}
	tput, loss, err := ms.GetLastTrafficMetric("node1", "node2")
	if err != nil {
		t.Errorf("Net metric should exist")
	} else if tput != 6.1 || loss != 7.1 {
		t.Errorf("Invalid metric values")
	}
	lat, mean, err = ms.GetLastLatencyMetric("node1", "node3")
	if err != nil {
		t.Errorf("Net metric should exist")
	} else if lat != 7 || mean != 8 {
		t.Errorf("Invalid metric values")
	}
	tput, loss, err = ms.GetLastTrafficMetric("node1", "node3")
	if err != nil {
		t.Errorf("Net metric should exist")
	} else if tput != 7.1 || loss != 8.1 {
		t.Errorf("Invalid metric values")
	}
	lat, mean, err = ms.GetLastLatencyMetric("node2", "node1")
	if err != nil {
		t.Errorf("Net metric should exist")
	} else if lat != 8 || mean != 9 {
		t.Errorf("Invalid metric values")
	}
	tput, loss, err = ms.GetLastTrafficMetric("node2", "node1")
	if err != nil {
		t.Errorf("Net metric should exist")
	} else if tput != 8.1 || loss != 9.1 {
		t.Errorf("Invalid metric values")
	}
	lat, mean, err = ms.GetLastLatencyMetric("node2", "node3")
	if err != nil {
		t.Errorf("Net metric should exist")
	} else if lat != 9 || mean != 0 {
		t.Errorf("Invalid metric values")
	}
	tput, loss, err = ms.GetLastTrafficMetric("node2", "node3")
	if err != nil {
		t.Errorf("Net metric should exist")
	} else if tput != 9.1 || loss != 0.1 {
		t.Errorf("Invalid metric values")
	}
	lat, mean, err = ms.GetLastLatencyMetric("node3", "node1")
	if err != nil {
		t.Errorf("Net metric should exist")
	} else if lat != 0 || mean != 1 {
		t.Errorf("Invalid metric values")
	}
	tput, loss, err = ms.GetLastTrafficMetric("node3", "node1")
	if err != nil {
		t.Errorf("Net metric should exist")
	} else if tput != 0.1 || loss != 1.1 {
		t.Errorf("Invalid metric values")
	}
	lat, mean, err = ms.GetLastLatencyMetric("node3", "node2")
	if err != nil {
		t.Errorf("Net metric should exist")
	} else if lat != 1 || mean != 2 {
		t.Errorf("Invalid metric values")
	}
	tput, loss, err = ms.GetLastTrafficMetric("node3", "node2")
	if err != nil {
		t.Errorf("Net metric should exist")
	} else if tput != 1.1 || loss != 2.1 {
		t.Errorf("Invalid metric values")
	}

	// logTimeLapse("Get network metrics: ")

	fmt.Println("Set event metric")
	err = ms.SetEventMetric("MOBILITY", "event1")
@@ -191,6 +287,8 @@ func TestGetSetMetric(t *testing.T) {
		t.Errorf("Unable to set event metric")
	}

	// logTimeLapse("Set event metrics: ")

	fmt.Println("Get event metrics")
	event, err := ms.GetLastEventMetric("MOBILITY")
	if err != nil {
@@ -211,5 +309,7 @@ func TestGetSetMetric(t *testing.T) {
		t.Errorf("Invalid metric values")
	}

	// logTimeLapse("Get event metrics: ")

	// t.Errorf("DONE")
}
+91 −0
Original line number Diff line number Diff line
/*
 * Copyright (c) 2019  InterDigital Communications, Inc
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package metricstore

import (
	"encoding/json"
	"errors"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
)

const metricLatency = "latency"
const metricTraffic = "traffic"

// SetLatencyMetric
func (ms *MetricStore) SetLatencyMetric(src string, dest string, lat int32, mean int32) error {
	tags := map[string]string{"src": src, "dest": dest}
	fields := map[string]interface{}{"lat": lat, "mean": mean}
	return ms.SetMetric(metricLatency, tags, fields)
}

// GetLastLatencyMetric
func (ms *MetricStore) GetLastLatencyMetric(src string, dest string) (lat int32, mean int32, err error) {
	// Make sure we have set a store
	if ms.name == "" {
		err = errors.New("Store name not specified")
		return
	}

	// Get latest Latency metric
	tags := map[string]string{"src": src, "dest": dest}
	fields := []string{"lat", "mean"}
	var valuesArray []map[string]interface{}
	valuesArray, err = ms.GetMetric(metricLatency, tags, fields, "", "", 1)
	if err != nil {
		log.Error("Failed to retrieve metrics with error: ", err.Error())
		return
	}

	// Take first & only values
	values := valuesArray[0]
	lat = JsonNumToInt32(values["lat"].(json.Number))
	mean = JsonNumToInt32(values["mean"].(json.Number))
	return
}

// SetTrafficMetric
func (ms *MetricStore) SetTrafficMetric(src string, dest string, tput float64, loss float64) error {
	tags := map[string]string{"src": src, "dest": dest}
	fields := map[string]interface{}{"tput": tput, "loss": loss}
	return ms.SetMetric(metricTraffic, tags, fields)
}

// GetLastTrafficMetric
func (ms *MetricStore) GetLastTrafficMetric(src string, dest string) (tput float64, loss float64, err error) {
	// Make sure we have set a store
	if ms.name == "" {
		err = errors.New("Store name not specified")
		return
	}

	// Get latest Net metric
	tags := map[string]string{"src": src, "dest": dest}
	fields := []string{"tput", "loss"}
	var valuesArray []map[string]interface{}
	valuesArray, err = ms.GetMetric(metricTraffic, tags, fields, "", "", 1)
	if err != nil {
		log.Error("Failed to retrieve metrics with error: ", err.Error())
		return
	}

	// Take first & only values
	values := valuesArray[0]
	tput = JsonNumToFloat64(values["tput"].(json.Number))
	loss = JsonNumToFloat64(values["loss"].(json.Number))
	return
}