Loading go-apps/meep-tc-engine/tc-engine.go +2 −0 Original line number Diff line number Diff line Loading @@ -544,11 +544,13 @@ func netCharUpdate(dstName string, srcName string, rate float64, latency float64 dstElement, found := netElemMap[dstName] if !found { log.Error("Failed to find flow destination: ", dstName) mutex.Unlock() return } filterInfo, found := dstElement.FilterInfoMap[srcName] if !found { log.Error("Failed to find flow source: ", srcName) mutex.Unlock() return } Loading go-apps/meep-tc-sidecar/destination.go +80 −27 Original line number Diff line number Diff line Loading @@ -48,6 +48,7 @@ type destination struct { ifbNumber string history *history historyRx *historyRx historyLogRx *historyRx } type stat struct { Loading @@ -62,8 +63,6 @@ type stat struct { const moduleMetrics string = "metrics" var elasticLogPacing uint func (u *destination) ping(pinger *Pinger) { rtt, err := pinger.Ping(u.remote, opts.timeout) if err != nil { Loading Loading @@ -206,10 +205,8 @@ func (u *destination) processRxTx(rc *redis.Connector) { throughput = 8 * (float64(rcvedBytes) - float64(previousRcvedBytes)) / diffInSeconds } var throughputStr, throughputVal string //all the throughput in Mbps throughputVal = strconv.FormatFloat(throughput/1000000, 'f', 3, 64) throughputStr = throughputVal + " Mbps" throughputVal := strconv.FormatFloat(throughput/1000000, 'f', 3, 64) u.historyRx.time = currentTime u.historyRx.rcvedBytes = rcvedBytes Loading Loading @@ -237,10 +234,68 @@ func (u *destination) processRxTx(rc *redis.Connector) { if rc.EntryExists(key) { _ = rc.SetEntry(moduleMetrics+":"+PodName+":throughput", throughputStats) } } func (u *destination) logRxTx(rc *redis.Connector) { str := "tc -s qdisc show dev ifb" + u.ifbNumber out, err := cmdExec(str) if err != nil { log.Error("tc -s qdisc show dev ifb", u.ifbNumber) log.Error(err) return } //ex :qdisc netem 1: root refcnt 2 limit 1000 delay 100.0ms 10.0ms 50% loss 50% rate 2Mbit\n Sent 756 bytes 8 pkt (dropped 4, overlimits 0 requeues 0 allStr := strings.Split(out, " ") //we have to read the allStr from the back since based on the results are always at the end but the characteristic may be different (no pkt loss, no normal distribution, etc) var rcvedPkts int var droppedPkts int var rcvedBytes int if len(allStr) > 20 { rcvedPkts, _ = strconv.Atoi(allStr[len(allStr)-15]) droppedPkts, _ = strconv.Atoi(allStr[len(allStr)-12][:len(allStr[len(allStr)-12])-1]) rcvedBytes, _ = strconv.Atoi(allStr[len(allStr)-17]) } else { log.Error("Error in the ifb statistics output: ", allStr) rcvedPkts = 0 droppedPkts = 0 rcvedBytes = 0 } //dropped rate in % var pktDroppedRate float64 pktDroppedRateStr := "0" totalPkts := rcvedPkts + droppedPkts if totalPkts > 0 { top := droppedPkts * 100 pktDroppedRate = (float64(top)) / float64(totalPkts) pktDroppedRateStr = strconv.FormatFloat(pktDroppedRate, 'f', 3, 64) } currentTime := time.Now() previousRcvedBytes := u.historyLogRx.rcvedBytes var throughput float64 if previousRcvedBytes != 0 { previousTime := u.historyLogRx.time diff := currentTime.Sub(previousTime) diffInSeconds := diff.Seconds() throughput = 8 * (float64(rcvedBytes) - float64(previousRcvedBytes)) / diffInSeconds } var throughputStr, throughputVal string //all the throughput in Mbps throughputVal = strconv.FormatFloat(throughput/1000000, 'f', 3, 64) throughputStr = throughputVal + " Mbps" u.historyLogRx.time = currentTime u.historyLogRx.rcvedBytes = rcvedBytes //pacing the logs in ES elasticLogPacing++ if elasticLogPacing%opts.trafficIntervalsPerLog == 0 { log.WithFields(log.Fields{ "meep.log.component": "sidecar", "meep.log.msgType": "ingressPacketStats", Loading @@ -254,5 +309,3 @@ func (u *destination) processRxTx(rc *redis.Connector) { "meep.log.packet-loss": pktDroppedRateStr, }).Info("Measurements log") } } go-apps/meep-tc-sidecar/main.go +65 −39 Original line number Diff line number Diff line Loading @@ -22,7 +22,6 @@ import ( "math/rand" "os" "os/exec" "strconv" "strings" "time" Loading Loading @@ -94,6 +93,14 @@ var opts = struct { resolverTimeout: 15000 * time.Millisecond, } // NetChar type NetChar struct { Latency string Jitter string PacketLoss string Throughput string } var pinger *Pinger var PodName string var ipTbl *ipt.IPTables Loading @@ -102,15 +109,12 @@ var letters = []rune(capLetters) var serviceChains = map[string]string{} var ifbs = map[string]string{} var filters = map[string]string{} var netcharMap = map[string]*NetChar{} var measurementsRunning = false var flushRequired = false var firstTimePass = true var currentTransactionId = 0 var dbTransactionId = 0 var lastTransactionIdApplied = 0 const redisAddr string = "meep-redis-master:6379" var rc *redis.Connector Loading Loading @@ -204,11 +208,7 @@ func eventHandler(channel string, payload string) { } func processNetCharMsg(payload string) { // NOTE: Payload contains only a transaction Id currentTransactionId, _ = strconv.Atoi(payload) _ = getTransactionIdApplied() //sets dbTransactionId and will apply it refreshNetCharRules() lastTransactionIdApplied = dbTransactionId } func processLbMsg(payload string) { Loading @@ -220,17 +220,26 @@ func refreshNetCharRules() { // Create shape rules _ = initializeOnFirstPass() // moduleName := "sidecar" // currentTime := time.Now() _ = createIfbs() // Create new filters (lower priority than the old one) _ = createFilters() // // Delete unused filters // Delete unused filters deleteUnusedFilters() // Delete unused ifbs deleteUnusedIfbs() // elapsed := time.Since(currentTime) // log.WithFields(log.Fields{ // "meep.log.component": moduleName, // "meep.time.location": "refreshNetCharRules execution time", // "meep.time.exec": elapsed, // }).Info("Measurements log") // Start measurements startMeasurementThreads() } Loading Loading @@ -461,6 +470,7 @@ func startMeasurementThreads() { callPing() go workLatency() go workRxTxPackets() go workLogRxTxData() measurementsRunning = true } } Loading Loading @@ -494,6 +504,9 @@ func callPing() { historyRx: &historyRx{ rcvedBytes: 0, }, historyLogRx: &historyRx{ rcvedBytes: 0, }, } opts.dests = append(opts.dests, &dst) Loading Loading @@ -546,6 +559,24 @@ func workRxTxPackets() { } } func workLogRxTxData() { for { //only this one affects the destinations based on info in the DB sem <- 1 for i, u := range opts.dests { //starting 1 thread for getting the rx-tx info and computing the appropriate metrics go func(u *destination, i int) { u.logRxTx(rc) }(u, i) } <-sem time.Sleep(opts.interval) } } func createPing() ([]podShortElement, error) { var podsToPing []podShortElement keyName := moduleTcEngine + ":" + typeNet + ":" + PodName + ":filter*" Loading @@ -568,21 +599,6 @@ func createPingHandler(key string, fields map[string]string, userData interface{ return nil } func getTransactionIdApplied() error { keyName := moduleTcEngine + ":" + typeNet + ":dbState" err := rc.ForEachEntry(keyName, getDbStateHandler, nil) if err != nil { return err } return nil } func getDbStateHandler(key string, fields map[string]string, userData interface{}) error { var err error dbTransactionId, err = strconv.Atoi(fields["transactionIdStored"]) return err } func createIfbs() error { keyName := moduleTcEngine + ":" + typeNet + ":" + PodName + ":shape*" err := rc.ForEachEntry(keyName, createIfbsHandler, nil) Loading @@ -596,18 +612,12 @@ func createIfbsHandler(key string, fields map[string]string, userData interface{ ifbNumber := fields["ifb_uniqueId"] _, exists := ifbs[ifbNumber] if !exists { _ = cmdCreateIfb(fields) ifbs[ifbNumber] = ifbNumber _ = cmdSetIfb(fields) } else { if lastTransactionIdApplied < currentTransactionId { _ = cmdSetIfb(fields) log.Info("Transactions processed: current ", currentTransactionId, " and last applied ", lastTransactionIdApplied) } else { log.Info("Transactions processed on the TC-Engine already applied ", currentTransactionId, " vs last applied ", lastTransactionIdApplied) } } return nil Loading Loading @@ -751,15 +761,31 @@ func cmdSetIfb(shape map[string]string) error { if delayVariation != "0" { normalDistributionStr = "distribution normal" } nc := netcharMap[ifbNumber] if nc == nil { nc = new(NetChar) netcharMap[ifbNumber] = nc } //only apply if an update is needed if nc.Latency != delay || nc.Jitter != delayVariation || nc.PacketLoss != loss || nc.Throughput != dataRate { str := "tc qdisc change dev ifb" + ifbNumber + " handle 1:0 root netem delay " + delay + "ms " + delayVariation + "ms " + delayCorrelation + "% " + normalDistributionStr + " loss " + lossInteger + "." + lossFraction + "%" if dataRate != "" && dataRate != "0" { str = str + " rate " + dataRate + "bit" } _, err := cmdExec(str) if err != nil { return err } //store the new values nc.Latency = delay nc.Jitter = delayVariation nc.PacketLoss = loss nc.Throughput = dataRate } return nil } Loading go-packages/meep-model/model.go +0 −5 Original line number Diff line number Diff line Loading @@ -498,11 +498,6 @@ func (m *Model) parseNodes() (err error) { func (m *Model) refresh() (err error) { if m.Active { err = m.rc.JSONDelEntry(m.activeKey, ".") if err != nil { log.Error(err.Error()) return err } jsonScenario, err := json.Marshal(m.scenario) if err != nil { log.Error(err.Error()) Loading Loading
go-apps/meep-tc-engine/tc-engine.go +2 −0 Original line number Diff line number Diff line Loading @@ -544,11 +544,13 @@ func netCharUpdate(dstName string, srcName string, rate float64, latency float64 dstElement, found := netElemMap[dstName] if !found { log.Error("Failed to find flow destination: ", dstName) mutex.Unlock() return } filterInfo, found := dstElement.FilterInfoMap[srcName] if !found { log.Error("Failed to find flow source: ", srcName) mutex.Unlock() return } Loading
go-apps/meep-tc-sidecar/destination.go +80 −27 Original line number Diff line number Diff line Loading @@ -48,6 +48,7 @@ type destination struct { ifbNumber string history *history historyRx *historyRx historyLogRx *historyRx } type stat struct { Loading @@ -62,8 +63,6 @@ type stat struct { const moduleMetrics string = "metrics" var elasticLogPacing uint func (u *destination) ping(pinger *Pinger) { rtt, err := pinger.Ping(u.remote, opts.timeout) if err != nil { Loading Loading @@ -206,10 +205,8 @@ func (u *destination) processRxTx(rc *redis.Connector) { throughput = 8 * (float64(rcvedBytes) - float64(previousRcvedBytes)) / diffInSeconds } var throughputStr, throughputVal string //all the throughput in Mbps throughputVal = strconv.FormatFloat(throughput/1000000, 'f', 3, 64) throughputStr = throughputVal + " Mbps" throughputVal := strconv.FormatFloat(throughput/1000000, 'f', 3, 64) u.historyRx.time = currentTime u.historyRx.rcvedBytes = rcvedBytes Loading Loading @@ -237,10 +234,68 @@ func (u *destination) processRxTx(rc *redis.Connector) { if rc.EntryExists(key) { _ = rc.SetEntry(moduleMetrics+":"+PodName+":throughput", throughputStats) } } func (u *destination) logRxTx(rc *redis.Connector) { str := "tc -s qdisc show dev ifb" + u.ifbNumber out, err := cmdExec(str) if err != nil { log.Error("tc -s qdisc show dev ifb", u.ifbNumber) log.Error(err) return } //ex :qdisc netem 1: root refcnt 2 limit 1000 delay 100.0ms 10.0ms 50% loss 50% rate 2Mbit\n Sent 756 bytes 8 pkt (dropped 4, overlimits 0 requeues 0 allStr := strings.Split(out, " ") //we have to read the allStr from the back since based on the results are always at the end but the characteristic may be different (no pkt loss, no normal distribution, etc) var rcvedPkts int var droppedPkts int var rcvedBytes int if len(allStr) > 20 { rcvedPkts, _ = strconv.Atoi(allStr[len(allStr)-15]) droppedPkts, _ = strconv.Atoi(allStr[len(allStr)-12][:len(allStr[len(allStr)-12])-1]) rcvedBytes, _ = strconv.Atoi(allStr[len(allStr)-17]) } else { log.Error("Error in the ifb statistics output: ", allStr) rcvedPkts = 0 droppedPkts = 0 rcvedBytes = 0 } //dropped rate in % var pktDroppedRate float64 pktDroppedRateStr := "0" totalPkts := rcvedPkts + droppedPkts if totalPkts > 0 { top := droppedPkts * 100 pktDroppedRate = (float64(top)) / float64(totalPkts) pktDroppedRateStr = strconv.FormatFloat(pktDroppedRate, 'f', 3, 64) } currentTime := time.Now() previousRcvedBytes := u.historyLogRx.rcvedBytes var throughput float64 if previousRcvedBytes != 0 { previousTime := u.historyLogRx.time diff := currentTime.Sub(previousTime) diffInSeconds := diff.Seconds() throughput = 8 * (float64(rcvedBytes) - float64(previousRcvedBytes)) / diffInSeconds } var throughputStr, throughputVal string //all the throughput in Mbps throughputVal = strconv.FormatFloat(throughput/1000000, 'f', 3, 64) throughputStr = throughputVal + " Mbps" u.historyLogRx.time = currentTime u.historyLogRx.rcvedBytes = rcvedBytes //pacing the logs in ES elasticLogPacing++ if elasticLogPacing%opts.trafficIntervalsPerLog == 0 { log.WithFields(log.Fields{ "meep.log.component": "sidecar", "meep.log.msgType": "ingressPacketStats", Loading @@ -254,5 +309,3 @@ func (u *destination) processRxTx(rc *redis.Connector) { "meep.log.packet-loss": pktDroppedRateStr, }).Info("Measurements log") } }
go-apps/meep-tc-sidecar/main.go +65 −39 Original line number Diff line number Diff line Loading @@ -22,7 +22,6 @@ import ( "math/rand" "os" "os/exec" "strconv" "strings" "time" Loading Loading @@ -94,6 +93,14 @@ var opts = struct { resolverTimeout: 15000 * time.Millisecond, } // NetChar type NetChar struct { Latency string Jitter string PacketLoss string Throughput string } var pinger *Pinger var PodName string var ipTbl *ipt.IPTables Loading @@ -102,15 +109,12 @@ var letters = []rune(capLetters) var serviceChains = map[string]string{} var ifbs = map[string]string{} var filters = map[string]string{} var netcharMap = map[string]*NetChar{} var measurementsRunning = false var flushRequired = false var firstTimePass = true var currentTransactionId = 0 var dbTransactionId = 0 var lastTransactionIdApplied = 0 const redisAddr string = "meep-redis-master:6379" var rc *redis.Connector Loading Loading @@ -204,11 +208,7 @@ func eventHandler(channel string, payload string) { } func processNetCharMsg(payload string) { // NOTE: Payload contains only a transaction Id currentTransactionId, _ = strconv.Atoi(payload) _ = getTransactionIdApplied() //sets dbTransactionId and will apply it refreshNetCharRules() lastTransactionIdApplied = dbTransactionId } func processLbMsg(payload string) { Loading @@ -220,17 +220,26 @@ func refreshNetCharRules() { // Create shape rules _ = initializeOnFirstPass() // moduleName := "sidecar" // currentTime := time.Now() _ = createIfbs() // Create new filters (lower priority than the old one) _ = createFilters() // // Delete unused filters // Delete unused filters deleteUnusedFilters() // Delete unused ifbs deleteUnusedIfbs() // elapsed := time.Since(currentTime) // log.WithFields(log.Fields{ // "meep.log.component": moduleName, // "meep.time.location": "refreshNetCharRules execution time", // "meep.time.exec": elapsed, // }).Info("Measurements log") // Start measurements startMeasurementThreads() } Loading Loading @@ -461,6 +470,7 @@ func startMeasurementThreads() { callPing() go workLatency() go workRxTxPackets() go workLogRxTxData() measurementsRunning = true } } Loading Loading @@ -494,6 +504,9 @@ func callPing() { historyRx: &historyRx{ rcvedBytes: 0, }, historyLogRx: &historyRx{ rcvedBytes: 0, }, } opts.dests = append(opts.dests, &dst) Loading Loading @@ -546,6 +559,24 @@ func workRxTxPackets() { } } func workLogRxTxData() { for { //only this one affects the destinations based on info in the DB sem <- 1 for i, u := range opts.dests { //starting 1 thread for getting the rx-tx info and computing the appropriate metrics go func(u *destination, i int) { u.logRxTx(rc) }(u, i) } <-sem time.Sleep(opts.interval) } } func createPing() ([]podShortElement, error) { var podsToPing []podShortElement keyName := moduleTcEngine + ":" + typeNet + ":" + PodName + ":filter*" Loading @@ -568,21 +599,6 @@ func createPingHandler(key string, fields map[string]string, userData interface{ return nil } func getTransactionIdApplied() error { keyName := moduleTcEngine + ":" + typeNet + ":dbState" err := rc.ForEachEntry(keyName, getDbStateHandler, nil) if err != nil { return err } return nil } func getDbStateHandler(key string, fields map[string]string, userData interface{}) error { var err error dbTransactionId, err = strconv.Atoi(fields["transactionIdStored"]) return err } func createIfbs() error { keyName := moduleTcEngine + ":" + typeNet + ":" + PodName + ":shape*" err := rc.ForEachEntry(keyName, createIfbsHandler, nil) Loading @@ -596,18 +612,12 @@ func createIfbsHandler(key string, fields map[string]string, userData interface{ ifbNumber := fields["ifb_uniqueId"] _, exists := ifbs[ifbNumber] if !exists { _ = cmdCreateIfb(fields) ifbs[ifbNumber] = ifbNumber _ = cmdSetIfb(fields) } else { if lastTransactionIdApplied < currentTransactionId { _ = cmdSetIfb(fields) log.Info("Transactions processed: current ", currentTransactionId, " and last applied ", lastTransactionIdApplied) } else { log.Info("Transactions processed on the TC-Engine already applied ", currentTransactionId, " vs last applied ", lastTransactionIdApplied) } } return nil Loading Loading @@ -751,15 +761,31 @@ func cmdSetIfb(shape map[string]string) error { if delayVariation != "0" { normalDistributionStr = "distribution normal" } nc := netcharMap[ifbNumber] if nc == nil { nc = new(NetChar) netcharMap[ifbNumber] = nc } //only apply if an update is needed if nc.Latency != delay || nc.Jitter != delayVariation || nc.PacketLoss != loss || nc.Throughput != dataRate { str := "tc qdisc change dev ifb" + ifbNumber + " handle 1:0 root netem delay " + delay + "ms " + delayVariation + "ms " + delayCorrelation + "% " + normalDistributionStr + " loss " + lossInteger + "." + lossFraction + "%" if dataRate != "" && dataRate != "0" { str = str + " rate " + dataRate + "bit" } _, err := cmdExec(str) if err != nil { return err } //store the new values nc.Latency = delay nc.Jitter = delayVariation nc.PacketLoss = loss nc.Throughput = dataRate } return nil } Loading
go-packages/meep-model/model.go +0 −5 Original line number Diff line number Diff line Loading @@ -498,11 +498,6 @@ func (m *Model) parseNodes() (err error) { func (m *Model) refresh() (err error) { if m.Active { err = m.rc.JSONDelEntry(m.activeKey, ".") if err != nil { log.Error(err.Error()) return err } jsonScenario, err := json.Marshal(m.scenario) if err != nil { log.Error(err.Error()) Loading