Commit f549bc39 authored by Kevin Di Lallo's avatar Kevin Di Lallo
Browse files

added session metrics

parent 54e3201d
Loading
Loading
Loading
Loading
+55 −26
Original line number Diff line number Diff line
@@ -35,6 +35,7 @@ import (
const defaultInfluxDBAddr = "http://meep-influxdb.default.svc.cluster.local:8086"
const dbMaxRetryCount = 2

const MetricsDbDisabled = "disabled"
const metricsDb = 0
const metricsKey = "metric-store:"

@@ -50,7 +51,6 @@ type MetricStore struct {
	namespace      string
	baseKey        string
	addr           string
	connected      bool
	influxClient   *influx.Client
	redisClient    *redis.Connector
	snapshotTicker *time.Ticker
@@ -70,15 +70,18 @@ func NewMetricStore(name string, namespace string, influxAddr string, redisAddr
	ms.baseKey = dkm.GetKeyRoot(namespace) + metricsKey

	// Connect to Redis DB
	if redisAddr != MetricsDbDisabled {
		ms.redisClient, err = redis.NewConnector(redisAddr, metricsDb)
		if err != nil {
			log.Error("Failed connection to Metrics redis DB. Error: ", err)
			return nil, err
		}
		log.Info("Connected to Metrics Redis DB")
	}

	// Connect to Influx DB
	for retry := 0; !ms.connected && retry <= dbMaxRetryCount; retry++ {
	if influxAddr != MetricsDbDisabled {
		for retry := 0; ms.influxClient == nil && retry <= dbMaxRetryCount; retry++ {
			err = ms.connectInfluxDB(influxAddr)
			if err != nil {
				log.Warn("Failed to connect to InfluxDB. Retrying... Error: ", err)
@@ -88,6 +91,7 @@ func NewMetricStore(name string, namespace string, influxAddr string, redisAddr
			return nil, err
		}
		log.Info("Connected to Metrics Influx DB")
	}

	// Set store to use
	err = ms.SetStore(name)
@@ -96,7 +100,7 @@ func NewMetricStore(name string, namespace string, influxAddr string, redisAddr
		return nil, err
	}

	log.Info("Successfully connected to Influx DB")
	log.Info("Successfully create Metric Store")
	return ms, nil
}

@@ -122,7 +126,6 @@ func (ms *MetricStore) connectInfluxDB(addr string) error {
	}

	ms.influxClient = &client
	ms.connected = true
	log.Info("InfluxDB Connector connected to ", ms.addr, " version: ", version)
	return nil
}
@@ -137,6 +140,7 @@ func (ms *MetricStore) SetStore(name string) error {
		storeName = strings.Replace(ms.namespace+"_"+name, "-", "_", -1)

		// Create new DB if necessary
		if ms.influxClient != nil {
			q := influx.NewQuery("CREATE DATABASE "+storeName, "", "")
			_, err := (*ms.influxClient).Query(q)
			if err != nil {
@@ -144,8 +148,10 @@ func (ms *MetricStore) SetStore(name string) error {
				return err
			}
		}
	}

	// Update store name
	log.Info("Store name set to: ", storeName)
	ms.name = storeName
	return nil
}
@@ -158,16 +164,20 @@ func (ms *MetricStore) Flush() {
	}

	// Flush Influx DB
	if ms.influxClient != nil {
		q := influx.NewQuery("DROP SERIES FROM /.*/", ms.name, "")
		response, err := (*ms.influxClient).Query(q)
		if err != nil {
			log.Error("Query failed with error: ", err.Error())
		}
		log.Info(response.Results)
	}

	// Flush Redis DB
	if ms.redisClient != nil {
		ms.redisClient.DBFlush(ms.baseKey + NetMetName)
	}
}

// Copy
func (ms *MetricStore) Copy(src string, dst string) error {
@@ -177,6 +187,11 @@ func (ms *MetricStore) Copy(src string, dst string) error {
		log.Error("Error: ", err.Error())
		return err
	}
	if ms.influxClient == nil {
		err := errors.New("Not connected to Influx DB")
		log.Error("Error: ", err.Error())
		return err
	}

	// Create store name using format: '<namespace>_<name>'
	// Replace dashes with underscores
@@ -215,6 +230,9 @@ func (ms *MetricStore) SetInfluxMetric(metricList []Metric) error {
	if ms.name == "" {
		return errors.New("Store name not specified")
	}
	if ms.influxClient == nil {
		return errors.New("Not connected to Influx DB")
	}

	// Create a new point batch
	bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
@@ -247,6 +265,9 @@ func (ms *MetricStore) GetInfluxMetric(metric string, tags map[string]string, fi
	if ms.name == "" {
		return values, errors.New("Store name not specified")
	}
	if ms.influxClient == nil {
		return values, errors.New("Not connected to Influx DB")
	}

	// Create query

@@ -328,6 +349,10 @@ func (ms *MetricStore) SetRedisMetric(metric string, tagStr string, fields map[s
		err = errors.New("Store name not specified")
		return
	}
	if ms.redisClient == nil {
		err = errors.New("Redis metrics DB disabled")
		return
	}

	// Store data
	key := ms.baseKey + metric + ":" + tagStr
@@ -347,6 +372,10 @@ func (ms *MetricStore) GetRedisMetric(metric string, tagStr string) (values []ma
		err := errors.New("Store name not specified")
		return values, err
	}
	if ms.redisClient == nil {
		err = errors.New("Redis metrics DB disabled")
		return values, err
	}

	// Get latest metrics
	key := ms.baseKey + metric + ":" + tagStr
+119 −0
Original line number Diff line number Diff line
/*
 * Copyright (c) 2019  InterDigital Communications, Inc
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package metricstore

import (
	"errors"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
)

const SesMetName = "session"
const SesMetProvider = "provider"
const SesMetUser = "user"
const SesMetType = "type"
const SesMetSid = "sid"
const SesMetSbox = "sbox"
const SesMetErrType = "errtype"
const SesMetDesc = "desc"

// Session metric types
const (
	SesMetTypeLogin  = "login"
	SesMetTypeLogout = "logout"
	SesMetTypeError  = "error"
)

// Session metric error types
const (
	SesMetErrTypeOauth       = "oauth"
	SesMetErrTypeMaxSessions = "maxsessions"
)

type SessionMetric struct {
	Time        interface{}
	Provider    string
	User        string
	SessionId   string
	Sandbox     string
	ErrType     string
	Description string
}

// SetSessionMetric
func (ms *MetricStore) SetSessionMetric(typ string, sm SessionMetric) error {
	metricList := make([]Metric, 1)
	metric := &metricList[0]
	metric.Name = SesMetName
	metric.Tags = map[string]string{SesMetType: typ}
	metric.Fields = map[string]interface{}{
		SesMetProvider: sm.Provider,
		SesMetUser:     sm.User,
		SesMetSid:      sm.SessionId,
		SesMetSbox:     sm.Sandbox,
		SesMetErrType:  sm.ErrType,
		SesMetDesc:     sm.Description,
	}
	return ms.SetInfluxMetric(metricList)
}

// GetSessionMetric
func (ms *MetricStore) GetSessionMetric(typ string, duration string, count int) (metrics []SessionMetric, err error) {
	// Make sure we have set a store
	if ms.name == "" {
		err = errors.New("Store name not specified")
		return
	}

	// Get Session metrics
	tags := map[string]string{}
	if typ != "" {
		tags[SesMetType] = typ
	}
	fields := []string{SesMetProvider, SesMetUser, SesMetSid, SesMetSbox, SesMetErrType, SesMetDesc}
	var valuesArray []map[string]interface{}
	valuesArray, err = ms.GetInfluxMetric(SesMetName, tags, fields, duration, count)
	if err != nil {
		log.Error("Failed to retrieve metrics with error: ", err.Error())
		return
	}

	// Format event metrics
	metrics = make([]SessionMetric, len(valuesArray))
	for index, values := range valuesArray {
		metrics[index].Time = values[NetMetTime]
		if val, ok := values[SesMetProvider].(string); ok {
			metrics[index].Provider = val
		}
		if val, ok := values[SesMetUser].(string); ok {
			metrics[index].User = val
		}
		if val, ok := values[SesMetSid].(string); ok {
			metrics[index].SessionId = val
		}
		if val, ok := values[SesMetSbox].(string); ok {
			metrics[index].Sandbox = val
		}
		if val, ok := values[SesMetErrType].(string); ok {
			metrics[index].ErrType = val
		}
		if val, ok := values[SesMetDesc].(string); ok {
			metrics[index].Description = val
		}
	}
	return
}
+142 −0
Original line number Diff line number Diff line
/*
 * Copyright (c) 2019  InterDigital Communications, Inc
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package metricstore

import (
	"fmt"
	"testing"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
)

const sessionStoreName string = "session-store"
const sessionStoreNamespace string = "common"
const sessionStoreInfluxAddr string = "http://localhost:30986"
const sessionStoreRedisAddr string = MetricsDbDisabled

func TestSessionMetricsGetSet(t *testing.T) {
	fmt.Println("--- ", t.Name())
	log.MeepTextLogInit(t.Name())

	fmt.Println("Create valid Metric Store")
	ms, err := NewMetricStore(sessionStoreName, sessionStoreNamespace, sessionStoreInfluxAddr, sessionStoreRedisAddr)
	if err != nil {
		t.Fatalf("Unable to create Metric Store")
	}

	fmt.Println("Flush store metrics")
	ms.Flush()

	fmt.Println("Set session metric")
	err = ms.SetSessionMetric(SesMetTypeLogin, SessionMetric{nil, "provider1", "user1", "sid1", "sbox1", "", "session1 description"})
	if err != nil {
		t.Fatalf("Unable to set session metric")
	}
	err = ms.SetSessionMetric(SesMetTypeLogout, SessionMetric{nil, "provider1", "user1", "sid1", "sbox1", "", "session1 description"})
	if err != nil {
		t.Fatalf("Unable to set session metric")
	}
	err = ms.SetSessionMetric(SesMetTypeError, SessionMetric{nil, "provider2", "2.2.2.2", "", "", SesMetErrTypeOauth, "session2 error description"})
	if err != nil {
		t.Fatalf("Unable to set session metric")
	}
	err = ms.SetSessionMetric(SesMetTypeError, SessionMetric{nil, "provider3", "3.3.3.3", "", "", SesMetErrTypeMaxSessions, "session3 error description"})
	if err != nil {
		t.Fatalf("Unable to set session metric")
	}
	err = ms.SetSessionMetric(SesMetTypeLogin, SessionMetric{nil, "provider4", "user4", "sid4", "sbox4", "", "session4 description"})
	if err != nil {
		t.Fatalf("Unable to set session metric")
	}
	err = ms.SetSessionMetric(SesMetTypeLogout, SessionMetric{nil, "provider4", "user4", "sid4", "sbox4", "", "session4 description"})
	if err != nil {
		t.Fatalf("Unable to set session metric")
	}

	fmt.Println("Get session metrics")
	sml, err := ms.GetSessionMetric(SesMetTypeLogin, "1ms", 0)
	if err != nil || len(sml) != 0 {
		t.Fatalf("No metrics should be found in the last 1 ms")
	}
	sml, err = ms.GetSessionMetric(SesMetTypeLogin, "", 1)
	if err != nil || len(sml) != 1 {
		t.Fatalf("Failed to get metric")
	}
	if !validateSessionMetric(sml[0], "provider1", "user1", "sid1", "sbox1", "", "session1 description") {
		t.Fatalf("Invalid event metric")
	}
	sml, err = ms.GetSessionMetric(SesMetTypeLogin, "", 0)
	if err != nil || len(sml) != 2 {
		t.Fatalf("Failed to get metric")
	}
	if !validateSessionMetric(sml[0], "provider1", "user1", "sid1", "sbox1", "", "session1 description") {
		t.Fatalf("Invalid event metric")
	}
	if !validateSessionMetric(sml[1], "provider4", "user4", "sid4", "sbox4", "", "session4 description") {
		t.Fatalf("Invalid event metric")
	}
	sml, err = ms.GetSessionMetric(SesMetTypeLogout, "", 0)
	if err != nil || len(sml) != 2 {
		t.Fatalf("Failed to get metric")
	}
	if !validateSessionMetric(sml[0], "provider1", "user1", "sid1", "sbox1", "", "session1 description") {
		t.Fatalf("Invalid event metric")
	}
	if !validateSessionMetric(sml[1], "provider4", "user4", "sid4", "sbox4", "", "session4 description") {
		t.Fatalf("Invalid event metric")
	}
	sml, err = ms.GetSessionMetric(SesMetTypeError, "", 0)
	if err != nil || len(sml) != 2 {
		t.Fatalf("Failed to get metric")
	}
	if !validateSessionMetric(sml[0], "provider2", "2.2.2.2", "", "", SesMetErrTypeOauth, "session2 error description") {
		t.Fatalf("Invalid event metric")
	}
	if !validateSessionMetric(sml[1], "provider3", "3.3.3.3", "", "", SesMetErrTypeMaxSessions, "session3 error description") {
		t.Fatalf("Invalid event metric")
	}

	// t.Fatalf("DONE")
}

func validateSessionMetric(sm SessionMetric, provider string, user string, sid string, sbox string, errType string, description string) bool {
	if sm.Provider != provider {
		fmt.Println("sm.Provider[" + sm.Provider + "] != provider [" + provider + "]")
		return false
	}
	if sm.User != user {
		fmt.Println("sm.User[" + sm.User + "] != user [" + user + "]")
		return false
	}
	if sm.SessionId != sid {
		fmt.Println("sm.SessionId[" + sm.SessionId + "] != sid [" + sid + "]")
		return false
	}
	if sm.Sandbox != sbox {
		fmt.Println("sm.Sandbox[" + sm.Sandbox + "] != sbox [" + sbox + "]")
		return false
	}
	if sm.ErrType != errType {
		fmt.Println("sm.ErrType[" + sm.ErrType + "] != errType [" + errType + "]")
		return false
	}
	if sm.Description != description {
		fmt.Println("sm.Description[" + sm.Description + "] != description [" + description + "]")
		return false
	}
	return true
}