Unverified Commit 3b327ef4 authored by Kevin Di Lallo's avatar Kevin Di Lallo Committed by GitHub
Browse files

Merge pull request #43 from pastorsx/kd_sp32_model

Net Char Manager support for latency, jitter & packet loss + TC Engine cleanup
parents e20609e2 eb433f06
Loading
Loading
Loading
Loading
+1 −3
Original line number Diff line number Diff line
@@ -254,9 +254,7 @@ func processScenario(model *mod.Model) error {
	mgm.netLocList = append(mgm.netLocList, model.GetNodeNames("DEFAULT")...)

	// Get list of processes
	procNames := model.GetNodeNames("CLOUD-APP")
	procNames = append(procNames, model.GetNodeNames("EDGE-APP")...)
	procNames = append(procNames, model.GetNodeNames("UE-APP")...)
	procNames := model.GetNodeNames("CLOUD-APP", "EDGE-APP", "UE-APP")

	// Get network graph from model
	mgm.networkGraph = model.GetNetworkGraph()
+1 −0
Original line number Diff line number Diff line
@@ -6,6 +6,7 @@ require (
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-ctrl-engine-model v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mg-manager-model v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-model v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-net-char-mgr v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis v0.0.0
	github.com/gogo/protobuf v1.2.1 // indirect
+210 −886

File changed.

Preview size limit exceeded, changes collapsed.

+80 −27
Original line number Diff line number Diff line
@@ -48,6 +48,7 @@ type destination struct {
	ifbNumber    string
	history      *history
	historyRx    *historyRx
	historyLogRx *historyRx
}

type stat struct {
@@ -62,8 +63,6 @@ type stat struct {

const moduleMetrics string = "metrics"

var elasticLogPacing uint

func (u *destination) ping(pinger *Pinger) {
	rtt, err := pinger.Ping(u.remote, opts.timeout)
	if err != nil {
@@ -206,10 +205,8 @@ func (u *destination) processRxTx(rc *redis.Connector) {
		throughput = 8 * (float64(rcvedBytes) - float64(previousRcvedBytes)) / diffInSeconds
	}

	var throughputStr, throughputVal string
	//all the throughput in Mbps
	throughputVal = strconv.FormatFloat(throughput/1000000, 'f', 3, 64)
	throughputStr = throughputVal + " Mbps"
	throughputVal := strconv.FormatFloat(throughput/1000000, 'f', 3, 64)

	u.historyRx.time = currentTime
	u.historyRx.rcvedBytes = rcvedBytes
@@ -237,10 +234,68 @@ func (u *destination) processRxTx(rc *redis.Connector) {
	if rc.EntryExists(key) {
		_ = rc.SetEntry(moduleMetrics+":"+PodName+":throughput", throughputStats)
	}
}

func (u *destination) logRxTx(rc *redis.Connector) {

	str := "tc -s qdisc show dev ifb" + u.ifbNumber
	out, err := cmdExec(str)
	if err != nil {
		log.Error("tc -s qdisc show dev ifb", u.ifbNumber)
		log.Error(err)
		return
	}
	//ex :qdisc netem 1: root refcnt 2 limit 1000 delay 100.0ms  10.0ms 50% loss 50% rate 2Mbit\n Sent 756 bytes 8 pkt (dropped 4, overlimits 0 requeues 0
	allStr := strings.Split(out, " ")

	//we have to read the allStr from the back since based on the results are always at the end but the characteristic may be different (no pkt loss, no normal distribution, etc)
	var rcvedPkts int
	var droppedPkts int
	var rcvedBytes int
	if len(allStr) > 20 {
		rcvedPkts, _ = strconv.Atoi(allStr[len(allStr)-15])
		droppedPkts, _ = strconv.Atoi(allStr[len(allStr)-12][:len(allStr[len(allStr)-12])-1])
		rcvedBytes, _ = strconv.Atoi(allStr[len(allStr)-17])
	} else {
		log.Error("Error in the ifb statistics output: ", allStr)
		rcvedPkts = 0
		droppedPkts = 0
		rcvedBytes = 0
	}

	//dropped rate in %
	var pktDroppedRate float64
	pktDroppedRateStr := "0"

	totalPkts := rcvedPkts + droppedPkts
	if totalPkts > 0 {
		top := droppedPkts * 100
		pktDroppedRate = (float64(top)) / float64(totalPkts)
		pktDroppedRateStr = strconv.FormatFloat(pktDroppedRate, 'f', 3, 64)
	}

	currentTime := time.Now()

	previousRcvedBytes := u.historyLogRx.rcvedBytes

	var throughput float64
	if previousRcvedBytes != 0 {

		previousTime := u.historyLogRx.time

		diff := currentTime.Sub(previousTime)
		diffInSeconds := diff.Seconds()
		throughput = 8 * (float64(rcvedBytes) - float64(previousRcvedBytes)) / diffInSeconds
	}

	var throughputStr, throughputVal string
	//all the throughput in Mbps
	throughputVal = strconv.FormatFloat(throughput/1000000, 'f', 3, 64)
	throughputStr = throughputVal + " Mbps"

	u.historyLogRx.time = currentTime
	u.historyLogRx.rcvedBytes = rcvedBytes

	//pacing the logs in ES
	elasticLogPacing++
	if elasticLogPacing%opts.trafficIntervalsPerLog == 0 {
	log.WithFields(log.Fields{
		"meep.log.component":     "sidecar",
		"meep.log.msgType":       "ingressPacketStats",
@@ -254,5 +309,3 @@ func (u *destination) processRxTx(rc *redis.Connector) {
		"meep.log.packet-loss":   pktDroppedRateStr,
	}).Info("Measurements log")
}

}
+65 −39
Original line number Diff line number Diff line
@@ -22,7 +22,6 @@ import (
	"math/rand"
	"os"
	"os/exec"
	"strconv"
	"strings"
	"time"

@@ -94,6 +93,14 @@ var opts = struct {
	resolverTimeout:        15000 * time.Millisecond,
}

// NetChar
type NetChar struct {
	Latency    string
	Jitter     string
	PacketLoss string
	Throughput string
}

var pinger *Pinger
var PodName string
var ipTbl *ipt.IPTables
@@ -102,15 +109,12 @@ var letters = []rune(capLetters)
var serviceChains = map[string]string{}
var ifbs = map[string]string{}
var filters = map[string]string{}
var netcharMap = map[string]*NetChar{}

var measurementsRunning = false
var flushRequired = false
var firstTimePass = true

var currentTransactionId = 0
var dbTransactionId = 0
var lastTransactionIdApplied = 0

const redisAddr string = "meep-redis-master:6379"

var rc *redis.Connector
@@ -204,11 +208,7 @@ func eventHandler(channel string, payload string) {
}

func processNetCharMsg(payload string) {
	// NOTE: Payload contains only a transaction Id
	currentTransactionId, _ = strconv.Atoi(payload)
	_ = getTransactionIdApplied() //sets dbTransactionId and will apply it
	refreshNetCharRules()
	lastTransactionIdApplied = dbTransactionId
}

func processLbMsg(payload string) {
@@ -220,17 +220,26 @@ func refreshNetCharRules() {
	// Create shape rules
	_ = initializeOnFirstPass()

	// moduleName := "sidecar"
	// currentTime := time.Now()

	_ = createIfbs()

	// Create new filters (lower priority than the old one)
	_ = createFilters()

	// // Delete unused filters
	// Delete unused filters
	deleteUnusedFilters()

	// Delete unused ifbs
	deleteUnusedIfbs()

	// elapsed := time.Since(currentTime)
	// log.WithFields(log.Fields{
	// 	"meep.log.component": moduleName,
	// 	"meep.time.location": "refreshNetCharRules execution time",
	// 	"meep.time.exec":     elapsed,
	// }).Info("Measurements log")

	// Start measurements
	startMeasurementThreads()
}
@@ -461,6 +470,7 @@ func startMeasurementThreads() {
		callPing()
		go workLatency()
		go workRxTxPackets()
		go workLogRxTxData()
		measurementsRunning = true
	}
}
@@ -494,6 +504,9 @@ func callPing() {
				historyRx: &historyRx{
					rcvedBytes: 0,
				},
				historyLogRx: &historyRx{
					rcvedBytes: 0,
				},
			}

			opts.dests = append(opts.dests, &dst)
@@ -546,6 +559,24 @@ func workRxTxPackets() {
	}
}

func workLogRxTxData() {
	for {
		//only this one affects the destinations based on info in the DB

		sem <- 1

		for i, u := range opts.dests {
			//starting 1 thread for getting the rx-tx info and computing the appropriate metrics
			go func(u *destination, i int) {
				u.logRxTx(rc)
			}(u, i)
		}
		<-sem

		time.Sleep(opts.interval)
	}
}

func createPing() ([]podShortElement, error) {
	var podsToPing []podShortElement
	keyName := moduleTcEngine + ":" + typeNet + ":" + PodName + ":filter*"
@@ -568,21 +599,6 @@ func createPingHandler(key string, fields map[string]string, userData interface{
	return nil
}

func getTransactionIdApplied() error {
	keyName := moduleTcEngine + ":" + typeNet + ":dbState"
	err := rc.ForEachEntry(keyName, getDbStateHandler, nil)
	if err != nil {
		return err
	}
	return nil
}

func getDbStateHandler(key string, fields map[string]string, userData interface{}) error {
	var err error
	dbTransactionId, err = strconv.Atoi(fields["transactionIdStored"])
	return err
}

func createIfbs() error {
	keyName := moduleTcEngine + ":" + typeNet + ":" + PodName + ":shape*"
	err := rc.ForEachEntry(keyName, createIfbsHandler, nil)
@@ -596,18 +612,12 @@ func createIfbsHandler(key string, fields map[string]string, userData interface{
	ifbNumber := fields["ifb_uniqueId"]

	_, exists := ifbs[ifbNumber]

	if !exists {
		_ = cmdCreateIfb(fields)
		ifbs[ifbNumber] = ifbNumber
		_ = cmdSetIfb(fields)
	} else {
		if lastTransactionIdApplied < currentTransactionId {
		_ = cmdSetIfb(fields)
			log.Info("Transactions processed: current ", currentTransactionId, " and last applied ", lastTransactionIdApplied)
		} else {
			log.Info("Transactions processed on the TC-Engine already applied ", currentTransactionId, " vs last applied ", lastTransactionIdApplied)
		}
	}

	return nil
@@ -751,15 +761,31 @@ func cmdSetIfb(shape map[string]string) error {
	if delayVariation != "0" {
		normalDistributionStr = "distribution normal"
	}

	nc := netcharMap[ifbNumber]
	if nc == nil {
		nc = new(NetChar)
		netcharMap[ifbNumber] = nc
	}
	//only apply if an update is needed
	if nc.Latency != delay || nc.Jitter != delayVariation || nc.PacketLoss != loss || nc.Throughput != dataRate {
		str := "tc qdisc change dev ifb" + ifbNumber + " handle 1:0 root netem delay " + delay + "ms " + delayVariation + "ms " + delayCorrelation + "% " + normalDistributionStr + " loss " + lossInteger + "." + lossFraction + "%"
		if dataRate != "" && dataRate != "0" {
			str = str + " rate " + dataRate + "bit"
		}

		_, err := cmdExec(str)
		if err != nil {
			return err
		}

		//store the new values
		nc.Latency = delay
		nc.Jitter = delayVariation
		nc.PacketLoss = loss
		nc.Throughput = dataRate
	}

	return nil
}

Loading