Commit c2dc3943 authored by Kevin Di Lallo's avatar Kevin Di Lallo
Browse files

influxdb metrics store initial implementation

parent cde8b910
Loading
Loading
Loading
Loading
+10 −0
Original line number Diff line number Diff line
module github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-metric-store

go 1.12

require (
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger v0.0.0
	github.com/influxdata/influxdb1-client v0.0.0-20190809212627-fc22c7df067e
)

replace github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger => ../../go-packages/meep-logger
+11 −0
Original line number Diff line number Diff line
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/influxdata/influxdb1-client v0.0.0-20190809212627-fc22c7df067e h1:txQltCyjXAqVVSZDArPEhUTg35hKwVIuXwtQo7eAMNQ=
github.com/influxdata/influxdb1-client v0.0.0-20190809212627-fc22c7df067e/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/sirupsen/logrus v1.4.1 h1:GL2rEmy6nsikmW0r8opw9JIRScdMF5hA8cOYLH7In1k=
github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q=
github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33 h1:I6FyU15t786LL7oL/hn43zqTuEGr4PN7F4XJ1p4E3Y8=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
+192 −0
Original line number Diff line number Diff line
/*
 * Copyright (c) 2019  InterDigital Communications, Inc
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package metricstore

import (
	"errors"
	"time"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"

	_ "github.com/influxdata/influxdb1-client" // this is important because of the bug in go mod
	influxclient "github.com/influxdata/influxdb1-client/v2"
)

const dbMaxRetryCount = 2
const (
	metricNet   = "netmet"
	metricEvent = "events"
)

// MetricStore - Implements a metric store
type MetricStore struct {
	name      string
	addr      string
	connected bool
	client    *influxclient.Client
}

// NewMetricStore - Creates and initialize a Metric Store instance
func NewMetricStore(name string, addr string) (ms *MetricStore, err error) {
	ms = new(MetricStore)
	ms.name = name

	// Connect to Influx DB
	for retry := 0; !ms.connected && retry <= dbMaxRetryCount; retry++ {
		err = ms.connectDB(addr)
		if err != nil {
			log.Warn("Failed to connect to InfluxDB. Retrying... Error: ", err)
		}
	}
	if err != nil {
		return nil, err
	}

	// Create Store DB if it does not exist
	q := influxclient.NewQuery("CREATE DATABASE "+name, "", "")
	response, err := (*ms.client).Query(q)
	if err != nil {
		log.Error("Query failed with error: ", err.Error())
	}
	log.Info(response.Results)

	log.Info("Successfully connected to Influx DB")
	return ms, nil
}

func (ms *MetricStore) connectDB(addr string) error {
	if addr == "" {
		ms.addr = "http://influxdb:8086"
	} else {
		ms.addr = addr
	}
	log.Debug("InfluxDB Connector connecting to ", ms.addr)

	client, err := influxclient.NewHTTPClient(influxclient.HTTPConfig{Addr: ms.addr})
	if err != nil {
		log.Error("InfluxDB Connector unable to connect ", ms.addr)
		return err
	}
	defer client.Close()

	_, version, err := client.Ping(1000 * time.Millisecond)
	if err != nil {
		log.Error("InfluxDB Connector unable to connect ", ms.addr)
		return err
	}

	ms.client = &client
	ms.connected = true
	log.Info("InfluxDB Connector connected to ", ms.addr, " version: ", version)
	return nil
}

// Flush
func (ms *MetricStore) Flush() {

	// Create Store DB if it does not exist
	q := influxclient.NewQuery("DROP SERIES FROM /.*/", ms.name, "")
	response, err := (*ms.client).Query(q)
	if err != nil {
		log.Error("Query failed with error: ", err.Error())
	}
	log.Info(response.Results)
}

// SetMetric - Generic metric setter
func (ms *MetricStore) SetMetric(metric string, tags map[string]string, fields map[string]interface{}) error {
	// Create a new point batch
	bp, _ := influxclient.NewBatchPoints(influxclient.BatchPointsConfig{
		Database:  ms.name,
		Precision: "us",
	})

	// Create a point and add to batch
	pt, err := influxclient.NewPoint(metric, tags, fields)
	if err != nil {
		log.Error("Failed to create point with error: ", err)
		return err
	}
	bp.AddPoint(pt)

	// Write the batch
	err = (*ms.client).Write(bp)
	if err != nil {
		log.Error("Failed to write point with error: ", err)
		return err
	}
	return nil
}

// GetMetric - Generic metric getter
func (ms *MetricStore) GetMetric(metric string) {
	q := influxclient.NewQuery("SELECT * FROM "+metric, ms.name, "")
	response, err := (*ms.client).Query(q)
	if err != nil {
		log.Error("Query failed with error: ", err.Error())
	}
	log.Error(response.Results)
}

// SetNetMetric
func (ms *MetricStore) SetNetMetric(src string, dest string, lat int32, tput int32, loss int64) error {
	tags := map[string]string{
		"src":  src,
		"dest": dest,
	}
	fields := map[string]interface{}{
		"lat":  lat,
		"tput": tput,
		"loss": loss,
	}
	return ms.SetMetric(metricNet, tags, fields)
}

// GetNetMetric
func (ms *MetricStore) GetLastNetMetric(src string, dest string) (lat int32, tput int32, loss int64, err error) {
	query := "SELECT lat,tput,loss FROM " + metricNet + " WHERE src='" + src + "' AND dest='" + dest + "' ORDER BY desc LIMIT 1"
	q := influxclient.NewQuery(query, ms.name, "")
	response, err := (*ms.client).Query(q)
	if err != nil {
		log.Error("Query failed with error: ", err.Error())
		return lat, tput, loss, err
	}
	log.Error(response.Results)

	if len(response.Results[0].Series) == 0 {
		err = errors.New("Query returned no results")
		log.Error("Query failed with error: ", err.Error())
		return lat, tput, loss, err
	}

	// Read results
	// response.Results[]

	return lat, tput, loss, nil
}

// SetEventMetric
func (ms *MetricStore) SetEventMetric(typ string, target string, desc string) error {
	tags := map[string]string{
		"type":   typ,
		"target": target,
	}
	fields := map[string]interface{}{
		"description": desc,
	}
	return ms.SetMetric(metricEvent, tags, fields)
}
+87 −0
Original line number Diff line number Diff line
/*
 * Copyright (c) 2019  InterDigital Communications, Inc
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package metricstore

import (
	"fmt"
	"testing"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
)

const metricStoreName string = "metricStore"
const metricStoreAddr string = "http://localhost:30386"

func TestNewMetricStore(t *testing.T) {
	fmt.Println("--- ", t.Name())
	log.MeepTextLogInit(t.Name())

	// Keep this one first...
	fmt.Println("Invalid Metric Store address")
	_, err := NewMetricStore(metricStoreName, "ExpectedFailure-InvalidStoreAddr")
	if err == nil {
		t.Errorf("Should report error on invalid store addr")
	}

	fmt.Println("Create valid Metric Store")
	_, err = NewMetricStore(metricStoreName, metricStoreAddr)
	if err != nil {
		t.Errorf("Unable to create Metric Store")
	}

	// t.Errorf("DONE")
}

func TestGetSetMetric(t *testing.T) {
	fmt.Println("--- ", t.Name())
	log.MeepTextLogInit(t.Name())

	fmt.Println("Create valid Metric Store")
	ms, err := NewMetricStore(metricStoreName, metricStoreAddr)
	if err != nil {
		t.Errorf("Unable to create Metric Store")
	}

	fmt.Println("Flush store metrics")
	ms.Flush()

	fmt.Println("Get empty metric")
	_, _, _, err = ms.GetLastNetMetric("node1", "node2")
	if err == nil {
		t.Errorf("Net metric should not exist")
	}

	fmt.Println("Set net metric")
	err = ms.SetNetMetric("node1", "node2", 1, 2, 3)
	if err != nil {
		t.Errorf("Unable to set net metric")
	}

	fmt.Println("Set event metric")
	err = ms.SetEventMetric("MOBILITY", "node1", "UE Mobility event")
	if err != nil {
		t.Errorf("Unable to set event metric")
	}

	fmt.Println("Get metric")
	_, _, _, err = ms.GetLastNetMetric("node1", "node2")
	if err != nil {
		t.Errorf("Net metric should exist")
	}

	// t.Errorf("DONE")
}