Commit 0ef4f944 authored by Kevin Di Lallo's avatar Kevin Di Lallo
Browse files

added latency & traffic metric reporting in tc-sidecar + flush...

added latency & traffic metric reporting in tc-sidecar + flush scenario-specific influx DB on activation
parent 5ae3db6e
Loading
Loading
Loading
Loading
+7 −4
Original line number Diff line number Diff line
@@ -52,9 +52,9 @@ var rc *redis.Connector
var activeModel *mod.Model
var metricStore *ms.MetricStore

var couchDBAddr = "http://meep-couchdb-svc-couchdb:5984/"
var redisDBAddr = "meep-redis-master:6379"
var influxDBAddr = "http://influxdb:8086"
var couchDBAddr string = "http://meep-couchdb-svc-couchdb:5984/"
var redisDBAddr string = "meep-redis-master:6379"
var influxDBAddr string = "http://meep-influxdb:8086"

func getCorePodsList() map[string]bool {

@@ -370,11 +370,14 @@ func ceActivateScenario(w http.ResponseWriter, r *http.Request) {
		return
	}

	// Set Metrics Store
	// Set Metrics Store & Flush entries
	err = metricStore.SetStore(scenarioName)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	metricStore.Flush()

	// Activate scenario & publish
	err = activeModel.SetScenario(scenario)
+1 −0
Original line number Diff line number Diff line
@@ -49,6 +49,7 @@ func TestCtrlEngine(t *testing.T) {

	fmt.Println("CtrlEngineInit()")
	couchDBAddr = "http://localhost:30985/"
	influxDBAddr = "http://localhost:30986"
	redisDBAddr = "localhost:30380"
	mod.DbAddress = redisDBAddr
	err := CtrlEngineInit()
+109 −133
Original line number Diff line number Diff line
@@ -25,9 +25,10 @@ import (
	"time"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	redis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis"
)

const moduleMetrics string = "metrics"

type history struct {
	received int
	lost     int
@@ -37,7 +38,9 @@ type history struct {

type historyRx struct {
	time      time.Time
	rcvedBytes int
	rxBytes   int
	rxPkt     int
	rxPktDrop int
}

type destination struct {
@@ -47,8 +50,8 @@ type destination struct {
	remoteName string
	ifbNumber  string
	history    *history
	historyRx    *historyRx
	historyLogRx *historyRx
	prevRx     *historyRx
	prevRxLog  *historyRx
}

type stat struct {
@@ -61,8 +64,6 @@ type stat struct {
	stddev  time.Duration
}

const moduleMetrics string = "metrics"

func (u *destination) ping(pinger *Pinger) {
	rtt, err := pinger.Ping(u.remote, opts.timeout)
	if err != nil {
@@ -83,7 +84,7 @@ func (u *destination) addResult(rtt time.Duration, err error) {
	s.mtx.Unlock()
}

func (u *destination) compute(rc *redis.Connector) (st stat) {
func (u *destination) compute() (st stat) {
	s := u.history
	s.mtx.RLock()
	defer s.mtx.RUnlock()
@@ -130,6 +131,7 @@ func (u *destination) compute(rc *redis.Connector) (st stat) {
		stddevNum += math.Pow(float64(rtt-st.mean), 2)
	}
	if size > 0 {
		// avg is only of last 50 measurements as only the last 50 durations are kept
		st.mean = time.Duration(float64(total) / float64(size))
		st.stddev = time.Duration(math.Sqrt(stddevNum / float64(size)))
	} else {
@@ -137,13 +139,22 @@ func (u *destination) compute(rc *redis.Connector) (st stat) {
		st.stddev = 0
	}

	// avg is only of last 50 measurements as only the last 50 durations are kept
	// log.Info("Measurements log for ", u.remote, " : ", st.last, ", avg: ", st.mean)
	// Format latency measurement
	lat := int32(math.Round(float64(st.last) / 1000000.0))
	mean := int32(math.Round(float64(st.mean) / 1000000.0))

	// Store latency metric
	err := metricStore.SetLatencyMetric(u.hostName, u.remoteName, lat, mean)
	if err != nil {
		log.Error("Failed to set latency metric")
	}

	// Log measurment
	log.WithFields(log.Fields{
		"meep.log.component":      "sidecar",
		"meep.log.msgType":        "latency",
		"meep.log.latency-latest": int(math.Round(float64(st.last) / 1000000.0)),
		"meep.log.latency-avg":    int(math.Round(float64(st.mean) / 1000000.0)),
		"meep.log.latency-latest": lat,
		"meep.log.latency-avg":    mean,
		"meep.log.src":            u.hostName,
		"meep.log.dest":           u.remoteName,
	}).Info("Measurements log")
@@ -151,8 +162,11 @@ func (u *destination) compute(rc *redis.Connector) (st stat) {
	return
}

func (u *destination) processRxTx(rc *redis.Connector) {
func (u *destination) processRxTx() {

	// Retrieve ifb statistics
	// ex :qdisc netem 1: root refcnt 2 limit 1000 delay 100.0ms 10.0ms 50% loss 50% rate 2Mbit\n
	//                    Sent 756 bytes 8 pkt (dropped 4, overlimits 0 requeues 0)
	str := "tc -s qdisc show dev ifb" + u.ifbNumber
	out, err := cmdExec(str)
	if err != nil {
@@ -160,84 +174,47 @@ func (u *destination) processRxTx(rc *redis.Connector) {
		log.Error(err)
		return
	}
	//ex :qdisc netem 1: root refcnt 2 limit 1000 delay 100.0ms  10.0ms 50% loss 50% rate 2Mbit\n Sent 756 bytes 8 pkt (dropped 4, overlimits 0 requeues 0
	allStr := strings.Split(out, " ")

	//we have to read the allStr from the back since based on the results are always at the end but the characteristic may be different (no pkt loss, no normal distribution, etc)
	var rcvedPkts int
	var droppedPkts int
	var rcvedBytes int
	if len(allStr) > 20 {
		rcvedPkts, _ = strconv.Atoi(allStr[len(allStr)-15])
		droppedPkts, _ = strconv.Atoi(allStr[len(allStr)-12][:len(allStr[len(allStr)-12])-1])
		rcvedBytes, _ = strconv.Atoi(allStr[len(allStr)-17])
	} else {
		log.Error("Error in the ifb statistics output: ", allStr)
		rcvedPkts = 0
		droppedPkts = 0
		rcvedBytes = 0
	}

	//dropped rate in %
	var pktDroppedRate float64
	pktDroppedRateStr := "0"

	totalPkts := rcvedPkts + droppedPkts
	if totalPkts > 0 {
		top := droppedPkts * 100
		pktDroppedRate = (float64(top)) / float64(totalPkts)
		pktDroppedRateStr = strconv.FormatFloat(pktDroppedRate, 'f', 3, 64)
	// Parse ifb stats
	// NOTE: we have to read the ifbStats from the back since based on the results are always at
	//       the end but the characteristic may be different (no pkt loss, no normal distribution, etc)
	ifbStats := strings.Split(out, " ")
	var curRxBytes int
	if len(ifbStats) > 20 {
		curRxBytes, _ = strconv.Atoi(ifbStats[len(ifbStats)-17])
	} else {
		log.Error("Error in the ifb statistics output: ", ifbStats)
	}

	currentTime := time.Now()

	previousRcvedBytes := u.historyRx.rcvedBytes
	// Get timestamp for calculations
	curTime := time.Now()

	var throughput float64
	var diffInMs int
	if previousRcvedBytes != 0 {

		previousTime := u.historyRx.time

		diff := currentTime.Sub(previousTime)
		diffInSeconds := diff.Seconds()
		diffInMs = int(diffInSeconds * 1000)
		throughput = 8 * (float64(rcvedBytes) - float64(previousRcvedBytes)) / diffInSeconds
	// Calculate throughput in Mbps
	var tput float64
	rxBytes := curRxBytes - u.prevRx.rxBytes
	if rxBytes != 0 {
		timeDiff := curTime.Sub(u.prevRx.time).Seconds()
		tput = (8 * float64(rxBytes) / timeDiff) / 1000000
	}

	//all the throughput in Mbps
	throughputVal := strconv.FormatFloat(throughput/1000000, 'f', 3, 64)

	u.historyRx.time = currentTime
	u.historyRx.rcvedBytes = rcvedBytes

	var stats = make(map[string]interface{})
	stats["uniqueName"] = PodName
	stats["trafficFrom"] = u.remoteName
	stats["totalReceivedPkts"] = rcvedPkts
	stats["totalDroppedPkts"] = droppedPkts
	stats["droppedPktRate"] = pktDroppedRateStr
	stats["totalReceivedBytes"] = rcvedBytes
	stats["receivedBytesDuringInterval"] = rcvedBytes - previousRcvedBytes
	stats["intervalInMs"] = diffInMs
	stats["throughput"] = throughputVal
	// Store latest values for next calculation
	u.prevRx.time = curTime
	u.prevRx.rxBytes = curRxBytes

	var throughputStats = make(map[string]interface{})
	throughputStats[u.remoteName] = throughputVal

	//store statistics but only if the entry exists
	key := moduleMetrics + ":" + PodName + ":" + u.remoteName
	if rc.EntryExists(key) {
		_ = rc.SetEntry(key, stats)
	}
	key = moduleMetrics + ":" + PodName + ":throughput"
	// Store throughput metric if entry exists
	var tputStats = make(map[string]interface{})
	tputStats[u.remoteName] = tput
	key := moduleMetrics + ":" + PodName + ":throughput"
	if rc.EntryExists(key) {
		_ = rc.SetEntry(moduleMetrics+":"+PodName+":throughput", throughputStats)
		_ = rc.SetEntry(key, tputStats)
	}
}

func (u *destination) logRxTx(rc *redis.Connector) {
func (u *destination) logRxTx() {

	// Retrieve ifb statistics
	// ex :qdisc netem 1: root refcnt 2 limit 1000 delay 100.0ms 10.0ms 50% loss 50% rate 2Mbit\n
	//                    Sent 756 bytes 8 pkt (dropped 4, overlimits 0 requeues 0)
	str := "tc -s qdisc show dev ifb" + u.ifbNumber
	out, err := cmdExec(str)
	if err != nil {
@@ -245,67 +222,66 @@ func (u *destination) logRxTx(rc *redis.Connector) {
		log.Error(err)
		return
	}
	//ex :qdisc netem 1: root refcnt 2 limit 1000 delay 100.0ms  10.0ms 50% loss 50% rate 2Mbit\n Sent 756 bytes 8 pkt (dropped 4, overlimits 0 requeues 0
	allStr := strings.Split(out, " ")

	//we have to read the allStr from the back since based on the results are always at the end but the characteristic may be different (no pkt loss, no normal distribution, etc)
	var rcvedPkts int
	var droppedPkts int
	var rcvedBytes int
	if len(allStr) > 20 {
		rcvedPkts, _ = strconv.Atoi(allStr[len(allStr)-15])
		droppedPkts, _ = strconv.Atoi(allStr[len(allStr)-12][:len(allStr[len(allStr)-12])-1])
		rcvedBytes, _ = strconv.Atoi(allStr[len(allStr)-17])

	// Parse ifb stats
	// NOTE: we have to read the ifbStats from the back since based on the results are always at
	//       the end but the characteristic may be different (no pkt loss, no normal distribution, etc)
	ifbStats := strings.Split(out, " ")
	var curRxPkt int
	var curRxPktDrop int
	var curRxBytes int
	if len(ifbStats) > 20 {
		curRxPkt, _ = strconv.Atoi(ifbStats[len(ifbStats)-15])
		curRxPktDrop, _ = strconv.Atoi(ifbStats[len(ifbStats)-12][:len(ifbStats[len(ifbStats)-12])-1])
		curRxBytes, _ = strconv.Atoi(ifbStats[len(ifbStats)-17])
	} else {
		log.Error("Error in the ifb statistics output: ", allStr)
		rcvedPkts = 0
		droppedPkts = 0
		rcvedBytes = 0
		log.Error("Error in the ifb statistics output: ", ifbStats)
	}

	//dropped rate in %
	var pktDroppedRate float64
	pktDroppedRateStr := "0"
	// Get timestamp for calculations
	curTime := time.Now()

	totalPkts := rcvedPkts + droppedPkts
	if totalPkts > 0 {
		top := droppedPkts * 100
		pktDroppedRate = (float64(top)) / float64(totalPkts)
		pktDroppedRateStr = strconv.FormatFloat(pktDroppedRate, 'f', 3, 64)
	// Calculate packet loss percentage
	var loss float64
	rxPkt := curRxPkt - u.prevRxLog.rxPkt
	rxPktDrop := curRxPktDrop - u.prevRxLog.rxPktDrop
	totalRxPkt := rxPkt + rxPktDrop
	if totalRxPkt > 0 {
		loss = (float64(rxPktDrop) / float64(totalRxPkt)) * 100
	}

	currentTime := time.Now()

	previousRcvedBytes := u.historyLogRx.rcvedBytes

	var throughput float64
	if previousRcvedBytes != 0 {

		previousTime := u.historyLogRx.time

		diff := currentTime.Sub(previousTime)
		diffInSeconds := diff.Seconds()
		throughput = 8 * (float64(rcvedBytes) - float64(previousRcvedBytes)) / diffInSeconds
	lossStr := strconv.FormatFloat(loss, 'f', 3, 64)

	// Calculate throughput in Mbps
	var tput float64
	rxBytes := curRxBytes - u.prevRxLog.rxBytes
	if rxBytes != 0 {
		timeDiff := curTime.Sub(u.prevRxLog.time).Seconds()
		tput = (8 * float64(rxBytes) / timeDiff) / 1000000
	}
	tputStr := strconv.FormatFloat(tput, 'f', 3, 64) + " Mbps"

	var throughputStr, throughputVal string
	//all the throughput in Mbps
	throughputVal = strconv.FormatFloat(throughput/1000000, 'f', 3, 64)
	throughputStr = throughputVal + " Mbps"
	// Store latest values for next calculation
	u.prevRxLog.time = curTime
	u.prevRxLog.rxBytes = curRxBytes
	u.prevRxLog.rxPkt = curRxPkt
	u.prevRxLog.rxPktDrop = curRxPktDrop

	u.historyLogRx.time = currentTime
	u.historyLogRx.rcvedBytes = rcvedBytes
	// Store latency metric
	err = metricStore.SetTrafficMetric(u.remoteName, PodName, tput, loss)
	if err != nil {
		log.Error("Failed to set traffic metric")
	}

	log.WithFields(log.Fields{
		"meep.log.component":     "sidecar",
		"meep.log.msgType":       "ingressPacketStats",
		"meep.log.src":           u.remoteName,
		"meep.log.dest":          u.hostName,
		"meep.log.rx":            rcvedPkts,
		"meep.log.rxd":           droppedPkts,
		"meep.log.rxBytes":       rcvedBytes,
		"meep.log.throughput":    throughput / 1000000, //converting bps to mbps for graph display
		"meep.log.throughputStr": throughputStr,
		"meep.log.packet-loss":   pktDroppedRateStr,
		"meep.log.rx":            rxPkt,
		"meep.log.rxd":           rxPktDrop,
		"meep.log.rxBytes":       rxBytes,
		"meep.log.throughput":    tput,
		"meep.log.throughputStr": tputStr,
		"meep.log.packet-loss":   lossStr,
	}).Info("Measurements log")
}
+6 −3
Original line number Diff line number Diff line
@@ -4,6 +4,7 @@ go 1.12

require (
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-metric-store v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis v0.0.0
	github.com/coreos/go-iptables v0.4.0
	github.com/gogo/protobuf v1.2.1 // indirect
@@ -22,6 +23,8 @@ require (
	sigs.k8s.io/yaml v1.1.0 // indirect
)

replace github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger => ../../go-packages/meep-logger

replace github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis => ../../go-packages/meep-redis
replace (
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger => ../../go-packages/meep-logger
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-metric-store => ../../go-packages/meep-metric-store
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis => ../../go-packages/meep-redis
)
+2 −0
Original line number Diff line number Diff line
@@ -18,6 +18,8 @@ github.com/google/gofuzz v1.0.0 h1:A8PeW59pxE9IoFRqBp37U+mSNaQoZ46F1f0f863XSXw=
github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/influxdata/influxdb1-client v0.0.0-20190809212627-fc22c7df067e h1:txQltCyjXAqVVSZDArPEhUTg35hKwVIuXwtQo7eAMNQ=
github.com/influxdata/influxdb1-client v0.0.0-20190809212627-fc22c7df067e/go.mod h1:qj24IKcXYK6Iy9ceXlo3Tc+vtHo9lIhSX5JddghvEPo=
github.com/json-iterator/go v1.1.6 h1:MrUvLMLTMxbqFJ9kzlvat/rYZqZnW3u4wkLzWTaFwKs=
github.com/json-iterator/go v1.1.6/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU=
github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q=
Loading