Commit a058d3af authored by Kevin Di Lallo's avatar Kevin Di Lallo
Browse files

multi-point write to influxdb + integration fixes

parent fd3f191b
Loading
Loading
Loading
Loading
+1 −2
Original line number Diff line number Diff line
@@ -370,14 +370,13 @@ func ceActivateScenario(w http.ResponseWriter, r *http.Request) {
		return
	}

	// Set Metrics Store & Flush entries
	// Set Metrics Store
	err = metricStore.SetStore(scenarioName)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	metricStore.Flush()

	// Activate scenario & publish
	err = activeModel.SetScenario(scenario)
+84 −49
Original line number Diff line number Diff line
@@ -75,6 +75,13 @@ type NetworkRegistration struct {

// Init - Metrics engine initialization
func Init() (err error) {
	// Connect to Metric Store
	metricStore, err = ms.NewMetricStore("", influxDBAddr, redisAddr)
	if err != nil {
		log.Error("Failed connection to Redis: ", err)
		return err
	}

	// Listen for model updates
	activeModel, err = mod.NewModel(redisAddr, moduleName, "activeScenario")
	if err != nil {
@@ -129,60 +136,32 @@ func processActiveScenarioUpdate(event string) {
}

func activateScenario() {
	// Connect to Metric Store
	var err error
	metricStore, err = ms.NewMetricStore(activeScenarioName, influxDBAddr, redisAddr)
	// Set Metrics Store
	err := metricStore.SetStore(activeScenarioName)
	if err != nil {
		log.Error("Failed connection to Influx: ", err)
		log.Error("Failed to set store with error: " + err.Error())
		return
	}
	if metricStore == nil {
		log.Error("MetricStore creation error")

	// Flush metric store entries on activation
	metricStore.Flush()

	// Start snapshot thread
	err = metricStore.StartSnapshotThread()
	if err != nil {
		log.Error("Failed to start snapshot thread: " + err.Error())
		return
	}
}

func terminateScenario(name string) {
	if name != "" {
		metricStore = nil
	}
}

func formatEventMetrics(metricList []map[string]interface{}, eventMetricList interface{}) {
	emList := eventMetricList.(*[]EventMetric)
	*emList = make([]EventMetric, len(metricList))
	for index, metric := range metricList {
		em := &((*emList)[index])
		em.Time = metric["time"].(string)
		if metric[ms.EvMetEvent] != nil {
			if val, ok := metric[ms.EvMetEvent].(string); ok {
				em.Event = val
			}
		}
	}
}
	// Terminate snapshot thread
	metricStore.StopSnapshotThread()

func formatNetworkMetrics(metricList []map[string]interface{}, networkMetricList interface{}) {
	nmList := networkMetricList.(*[]NetworkMetric)
	*nmList = make([]NetworkMetric, len(metricList))
	for index, metric := range metricList {
		nm := &((*nmList)[index])
		nm.Time = metric["time"].(string)
		if metric[ms.NetMetLatency] != nil {
			nm.Lat = ms.JsonNumToInt32(metric[ms.NetMetLatency].(json.Number))
		}
		if metric[ms.NetMetULThroughput] != nil {
			nm.Ul = ms.JsonNumToFloat64(metric[ms.NetMetULThroughput].(json.Number))
		}
		if metric[ms.NetMetDLThroughput] != nil {
			nm.Dl = ms.JsonNumToFloat64(metric[ms.NetMetDLThroughput].(json.Number))
		}
		if metric[ms.NetMetULPktLoss] != nil {
			nm.Ulos = ms.JsonNumToFloat64(metric[ms.NetMetULPktLoss].(json.Number))
		}
		if metric[ms.NetMetDLPktLoss] != nil {
			nm.Dlos = ms.JsonNumToFloat64(metric[ms.NetMetDLPktLoss].(json.Number))
		}
	// Set Metrics Store
	err := metricStore.SetStore("")
	if err != nil {
		log.Error(err.Error())
	}
}

@@ -246,7 +225,16 @@ func mePostEventQuery(w http.ResponseWriter, r *http.Request) {
	var response EventMetricList
	response.Name = "event metrics"
	response.Columns = append(params.Fields, "time")
	formatEventMetrics(valuesArray, &response.Values)
	response.Values = make([]EventMetric, len(valuesArray))
	for index, values := range valuesArray {
		metric := &response.Values[index]
		metric.Time = values["time"].(string)
		if values[ms.EvMetEvent] != nil {
			if val, ok := values[ms.EvMetEvent].(string); ok {
				metric.Event = val
			}
		}
	}

	jsonResponse, err := json.Marshal(response)
	if err != nil {
@@ -318,7 +306,26 @@ func mePostNetworkQuery(w http.ResponseWriter, r *http.Request) {
	var response NetworkMetricList
	response.Name = "network metrics"
	response.Columns = append(params.Fields, "time")
	formatNetworkMetrics(valuesArray, &response.Values)
	response.Values = make([]NetworkMetric, len(valuesArray))
	for index, values := range valuesArray {
		metric := &response.Values[index]
		metric.Time = values["time"].(string)
		if values[ms.NetMetLatency] != nil {
			metric.Lat = ms.JsonNumToInt32(values[ms.NetMetLatency].(json.Number))
		}
		if values[ms.NetMetULThroughput] != nil {
			metric.Ul = ms.JsonNumToFloat64(values[ms.NetMetULThroughput].(json.Number))
		}
		if values[ms.NetMetDLThroughput] != nil {
			metric.Dl = ms.JsonNumToFloat64(values[ms.NetMetDLThroughput].(json.Number))
		}
		if values[ms.NetMetULPktLoss] != nil {
			metric.Ulos = ms.JsonNumToFloat64(values[ms.NetMetULPktLoss].(json.Number))
		}
		if values[ms.NetMetDLPktLoss] != nil {
			metric.Dlos = ms.JsonNumToFloat64(values[ms.NetMetDLPktLoss].(json.Number))
		}
	}

	jsonResponse, err := json.Marshal(response)
	if err != nil {
@@ -528,7 +535,16 @@ func processEventNotification(subsId string) {

		if err == nil {
			response.Columns = append(eventRegistration.params.EventQueryParams.Fields, "time")
			formatEventMetrics(valuesArray, &response.Values)
			response.Values = make([]clientv2.EventMetric, len(valuesArray))
			for index, values := range valuesArray {
				metric := &response.Values[index]
				metric.Time = values["time"].(string)
				if values[ms.EvMetEvent] != nil {
					if val, ok := values[ms.EvMetEvent].(string); ok {
						metric.Event = val
					}
				}
			}
		}
	}

@@ -559,7 +575,26 @@ func processNetworkNotification(subsId string) {

		if err == nil {
			response.Columns = append(networkRegistration.params.NetworkQueryParams.Fields, "time")
			formatNetworkMetrics(valuesArray, &response.Values)
			response.Values = make([]clientv2.NetworkMetric, len(valuesArray))
			for index, values := range valuesArray {
				metric := &response.Values[index]
				metric.Time = values["time"].(string)
				if values[ms.NetMetLatency] != nil {
					metric.Lat = ms.JsonNumToInt32(values[ms.NetMetLatency].(json.Number))
				}
				if values[ms.NetMetULThroughput] != nil {
					metric.Ul = ms.JsonNumToFloat64(values[ms.NetMetULThroughput].(json.Number))
				}
				if values[ms.NetMetDLThroughput] != nil {
					metric.Dl = ms.JsonNumToFloat64(values[ms.NetMetDLThroughput].(json.Number))
				}
				if values[ms.NetMetULPktLoss] != nil {
					metric.Ulos = ms.JsonNumToFloat64(values[ms.NetMetULPktLoss].(json.Number))
				}
				if values[ms.NetMetDLPktLoss] != nil {
					metric.Dlos = ms.JsonNumToFloat64(values[ms.NetMetDLPktLoss].(json.Number))
				}
			}
		}
	}

+3 −1
Original line number Diff line number Diff line
@@ -270,12 +270,14 @@ func (u *destination) logRxTx() {
	// Store network metric
	srcDest := u.hostName + ":" + u.remoteName
	var metric ms.NetworkMetric
	metric.Src = u.remoteName
	metric.Dst = u.hostName
	semLatencyMap.Lock()
	metric.Lat = latestLatencyResultsMap[srcDest]
	semLatencyMap.Unlock()
	metric.UlTput = tput
	metric.UlLoss = loss
	err = metricStore.SetCachedNetworkMetric(u.remoteName, u.hostName, metric)
	err = metricStore.SetCachedNetworkMetric(metric)
	if err != nil {
		log.Error("Failed to set network metric")
	}
+7 −4
Original line number Diff line number Diff line
@@ -32,10 +32,13 @@ type EventMetric struct {
}

// SetEventMetric
func (ms *MetricStore) SetEventMetric(eventType string, metric EventMetric) error {
	tags := map[string]string{EvMetType: eventType}
	fields := map[string]interface{}{EvMetEvent: metric.Event}
	return ms.SetInfluxMetric(EvMetName, tags, fields)
func (ms *MetricStore) SetEventMetric(eventType string, em EventMetric) error {
	metricList := make([]Metric, 1)
	metric := &metricList[0]
	metric.Name = EvMetName
	metric.Tags = map[string]string{EvMetType: eventType}
	metric.Fields = map[string]interface{}{EvMetEvent: em.Event}
	return ms.SetInfluxMetric(metricList)
}

// GetEventMetric
+54 −18
Original line number Diff line number Diff line
@@ -37,6 +37,12 @@ const dbMaxRetryCount = 2
const metricsDb = 0
const moduleMetrics = "metric-store"

type Metric struct {
	Name   string
	Tags   map[string]string
	Fields map[string]interface{}
}

// MetricStore - Implements a metric store
type MetricStore struct {
	name         string
@@ -44,6 +50,7 @@ type MetricStore struct {
	connected    bool
	influxClient *influx.Client
	redisClient  *redis.Connector
	snapTicker   *time.Ticker
}

// NewMetricStore - Creates and initialize a Metric Store instance
@@ -115,7 +122,7 @@ func (ms *MetricStore) SetStore(name string) error {
	// Remove dashes from name
	storeName := strings.Replace(name, "-", "", -1)

	// Set current store. Create new DB if necessary.
	// Create new DB if necessary.
	if storeName != "" {
		q := influx.NewQuery("CREATE DATABASE "+storeName, "", "")
		_, err := (*ms.influxClient).Query(q)
@@ -123,8 +130,10 @@ func (ms *MetricStore) SetStore(name string) error {
			log.Error("Query failed with error: ", err.Error())
			return err
		}
		ms.name = storeName
	}

	// Set current store
	ms.name = storeName
	return nil
}

@@ -148,38 +157,35 @@ func (ms *MetricStore) Flush() {
}

// SetInfluxMetric - Generic metric setter
func (ms *MetricStore) SetInfluxMetric(metric string, tags map[string]string, fields map[string]interface{}) error {
func (ms *MetricStore) SetInfluxMetric(metricList []Metric) error {
	// Make sure we have set a store
	if ms.name == "" {
		err := errors.New("Store name not specified")
		return err
	}

	// start = time.Now()

	// Create a new point batch
	bp, _ := influx.NewBatchPoints(influx.BatchPointsConfig{
		Database:  ms.name,
		Precision: "us",
	})

	// Create a point and add to batch
	pt, err := influx.NewPoint(metric, tags, fields)
	// Create & add points to batch
	for _, metric := range metricList {
		pt, err := influx.NewPoint(metric.Name, metric.Tags, metric.Fields)
		if err != nil {
			log.Error("Failed to create point with error: ", err)
			return err
		}
		bp.AddPoint(pt)
	}

	// Write the batch
	err = (*ms.influxClient).Write(bp)
	err := (*ms.influxClient).Write(bp)
	if err != nil {
		log.Error("Failed to write point with error: ", err)
		return err
	}

	// logTimeLapse("SetMetric duration: ")

	return nil
}

@@ -187,8 +193,7 @@ func (ms *MetricStore) SetInfluxMetric(metric string, tags map[string]string, fi
func (ms *MetricStore) GetInfluxMetric(metric string, tags map[string]string, fields []string, duration string, count int) (values []map[string]interface{}, err error) {
	// Make sure we have set a store
	if ms.name == "" {
		err := errors.New("Store name not specified")
		return values, err
		return values, errors.New("Store name not specified")
	}

	// Create query
@@ -304,6 +309,9 @@ func (ms *MetricStore) getMetricsEntryHandler(key string, fields map[string]stri
		values[k] = v
	}

	// Add key to returned values
	values["key"] = key

	// Append values list to data
	data := userData.(*[]map[string]interface{})
	*data = append(*data, values)
@@ -311,8 +319,36 @@ func (ms *MetricStore) getMetricsEntryHandler(key string, fields map[string]stri
	return nil
}

func (ms *MetricStore) StartSnapshotThread() error {
	// Make sure we have set a store
	if ms.name == "" {
		return errors.New("Store name not specified")
	}
	// Make sure ticker is not already running
	if ms.snapTicker != nil {
		return errors.New("ticker already running")
	}

	// Create new ticker and start snapshot thread
	ms.snapTicker = time.NewTicker(time.Second)
	go func() {
		for range ms.snapTicker.C {
			ms.takeNetworkMetricSnapshot()
		}
	}()

	return nil
}

func (ms *MetricStore) StopSnapshotThread() {
	if ms.snapTicker != nil {
		ms.snapTicker.Stop()
		ms.snapTicker = nil
	}
}

// func logTimeLapse(logStr string) {
// 	stop := time.Now()
// 	log.Debug(logStr, strconv.FormatFloat(stop.Sub(start).Seconds()*1000, 'f', 3, 64), " ms")
// 	log.Debug("TIME: ", logStr, " ", strconv.FormatFloat(stop.Sub(start).Seconds()*1000, 'f', 3, 64), " ms")
// 	start = stop
// }
Loading