Unverified Commit 503b47dc authored by Kevin Di Lallo's avatar Kevin Di Lallo Committed by GitHub
Browse files

Merge pull request #55 from pastorsx/sp_dev_sidecar_opt_term_issue

sidecar optimization - evicted pods - termination optimization
parents 60b89f9e 64ba49b0
Loading
Loading
Loading
Loading
+33 −8
Original line number Diff line number Diff line
@@ -354,6 +354,16 @@ func ceActivateScenario(w http.ResponseWriter, r *http.Request) {
	scenarioName := vars["name"]
	log.Debug("Scenario name: ", scenarioName)

	if activeModel == nil {
		var err error
		activeModel, err = mod.NewModel(mod.DbAddress, moduleName, "activeScenario")
		if err != nil {
			log.Error("Failed to create model: ", err.Error())
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}
	}

	// Make sure scenario is not already deployed
	if activeModel.Active {
		log.Error("Scenario already active")
@@ -402,7 +412,7 @@ func ceActivateScenario(w http.ResponseWriter, r *http.Request) {
func ceGetActiveScenario(w http.ResponseWriter, r *http.Request) {
	log.Debug("CEGetActiveScenario")

	if !activeModel.Active {
	if activeModel == nil || !activeModel.Active {
		http.Error(w, "No scenario is active", http.StatusNotFound)
		return
	}
@@ -426,7 +436,7 @@ func ceGetActiveScenario(w http.ResponseWriter, r *http.Request) {
func ceGetActiveNodeServiceMaps(w http.ResponseWriter, r *http.Request) {
	var filteredList *[]ceModel.NodeServiceMaps

	if !activeModel.Active {
	if activeModel == nil || !activeModel.Active {
		http.Error(w, "No scenario is active", http.StatusNotFound)
		return
	}
@@ -510,14 +520,14 @@ func ceGetActiveNodeServiceMaps(w http.ResponseWriter, r *http.Request) {
func ceTerminateScenario(w http.ResponseWriter, r *http.Request) {
	log.Debug("ceTerminateScenario")

	if !activeModel.Active {
	if activeModel == nil || !activeModel.Active {
		http.Error(w, "No scenario is active", http.StatusNotFound)
		return
	}

	err := activeModel.Deactivate()
	if err != nil {
		log.Error(err.Error())
		log.Error("Failed to deactivate: ", err.Error())
		http.Error(w, err.Error(), http.StatusNotFound)
		return
	}
@@ -546,7 +556,7 @@ func ceTerminateScenario(w http.ResponseWriter, r *http.Request) {
func ceSendEvent(w http.ResponseWriter, r *http.Request) {
	log.Debug("ceSendEvent")

	if !activeModel.Active {
	if activeModel == nil || !activeModel.Active {
		http.Error(w, "No scenario is active", http.StatusNotFound)
		return
	}
@@ -614,6 +624,8 @@ func ceGetStates(w http.ResponseWriter, r *http.Request) {

	subKey := ""
	var podsStatus ceModel.PodsStatus
	var podsStatusInReply ceModel.PodsStatus

	// Retrieve client ID & service name from query parameters
	query := r.URL.Query()
	longParam := query.Get("long")
@@ -661,27 +673,40 @@ func ceGetStates(w http.ResponseWriter, r *http.Request) {
		podsStatus.PodStatus = append(podsStatus.PodStatus, podStatus)
		// ***** virt-engine running or not code END

		//if some are missing... its because its coming up and as such... we cannot return a success yet... adding one entry that will be false

		corePods := getCorePodsList()
		uniqueCorePodsInReply := make(map[string]ceModel.PodStatus)

		//loop through each of them by name
		for _, statusPod := range podsStatus.PodStatus {
			for corePod := range corePods {
				if strings.Contains(statusPod.Name, corePod) {
					corePods[corePod] = true
					//filter for reporting one pod for each core pod type (we send the RUNNING one if any, otherwise we send whatever the failed one we have)
					storedUniqueCorePod := uniqueCorePodsInReply[corePod]
					if storedUniqueCorePod.Name != "" {
						if storedUniqueCorePod.LogicalState != "Running" && storedUniqueCorePod.LogicalState != statusPod.LogicalState {
							uniqueCorePodsInReply[corePod] = statusPod
						}
					} else {
						uniqueCorePodsInReply[corePod] = statusPod
					}

					break
				}
			}
		}

		for _, uniqueCorePod := range uniqueCorePodsInReply {
			podsStatusInReply.PodStatus = append(podsStatusInReply.PodStatus, uniqueCorePod)
		}

		//loop through the list of pods to see which one might be missing
		for corePod := range corePods {
			if !corePods[corePod] {
				var podStatus ceModel.PodStatus
				podStatus.Name = corePod
				podStatus.LogicalState = "NotAvailable"
				podsStatus.PodStatus = append(podsStatus.PodStatus, podStatus)
				podsStatusInReply.PodStatus = append(podsStatusInReply.PodStatus, podStatus)
			}
		}
	}
+8 −2
Original line number Diff line number Diff line
@@ -256,7 +256,10 @@ func addOrUpdateEntryInDB(monEngineInfo MonEngineInfo) {
	key := moduleMonEngine + ":MO-" + monEngineInfo.MeepOrigin + ":MS-" + monEngineInfo.MeepScenario + ":MA-" + monEngineInfo.MeepApp + ":" + monEngineInfo.PodName

	// Set rule information in DB
	_ = rc.SetEntry(key, fields)
	err := rc.SetEntry(key, fields)
	if err != nil {
		log.Error("Entry could not be updated in DB for ", monEngineInfo.MeepApp, ": ", err)
	}
}

func deleteEntryInDB(monEngineInfo MonEngineInfo) {
@@ -265,7 +268,10 @@ func deleteEntryInDB(monEngineInfo MonEngineInfo) {
	key := moduleMonEngine + ":MO-" + monEngineInfo.MeepOrigin + ":MS-" + monEngineInfo.MeepScenario + ":MA-" + monEngineInfo.MeepApp + ":" + monEngineInfo.PodName

	// Set rule information in DB
	_ = rc.DelEntry(key)
	err := rc.DelEntry(key)
	if err != nil {
		log.Error("Entry could not be deleted in DB for ", monEngineInfo.MeepApp, ": ", err)
	}
}

func k8sConnect() (err error) {
+1 −1
Original line number Diff line number Diff line
@@ -13,6 +13,6 @@
# limitations under the License.

FROM debian:9.6-slim
RUN apt-get update && apt-get install -y iputils-ping iproute2 iptables conntrack
RUN apt-get update && apt-get install -y iputils-ping iproute2 iptables conntrack net-tools
COPY ./meep-tc-sidecar /meep-tc-sidecar
ENTRYPOINT ["/meep-tc-sidecar"]
+16 −35
Original line number Diff line number Diff line
@@ -163,26 +163,16 @@ func (u *destination) compute() (st stat) {
	return
}

func (u *destination) processRxTx() {
func (u *destination) processRxTx(ifbStatsStr string) {

	// Retrieve ifb statistics
	// ex :qdisc netem 1: root refcnt 2 limit 1000 delay 100.0ms 10.0ms 50% loss 50% rate 2Mbit\n
	//                    Sent 756 bytes 8 pkt (dropped 4, overlimits 0 requeues 0)
	str := "tc -s qdisc show dev ifb" + u.ifbNumber
	out, err := cmdExec(str)
	if err != nil {
		log.Error("tc -s qdisc show dev ifb", u.ifbNumber)
		log.Error(err)
		return
	}

	// Parse ifb stats
	// Retrieve ifb statistics from passed string
	// NOTE: we have to read the ifbStats from the back since based on the results are always at
	//       the end but the characteristic may be different (no pkt loss, no normal distribution, etc)
	ifbStats := strings.Split(out, " ")
	ifbStats := strings.Split(ifbStatsStr, " ")

	var curRxBytes int
	if len(ifbStats) > 20 {
		curRxBytes, _ = strconv.Atoi(ifbStats[len(ifbStats)-17])
	if len(ifbStats) >= 13 {
		curRxBytes, _ = strconv.Atoi(ifbStats[len(ifbStats)-11])
	} else {
		log.Error("Error in the ifb statistics output: ", ifbStats)
	}
@@ -206,35 +196,25 @@ func (u *destination) processRxTx() {
	var tputStats = make(map[string]interface{})
	tputStats[u.remoteName] = tput
	key := moduleMetrics + ":" + PodName + ":throughput"

	if rc.EntryExists(key) {
		_ = rc.SetEntry(key, tputStats)
	}
}

func (u *destination) logRxTx() {
func (u *destination) logRxTx(ifbStatsStr string) {

	// Retrieve ifb statistics
	// ex :qdisc netem 1: root refcnt 2 limit 1000 delay 100.0ms 10.0ms 50% loss 50% rate 2Mbit\n
	//                    Sent 756 bytes 8 pkt (dropped 4, overlimits 0 requeues 0)
	str := "tc -s qdisc show dev ifb" + u.ifbNumber
	out, err := cmdExec(str)
	if err != nil {
		log.Error("tc -s qdisc show dev ifb", u.ifbNumber)
		log.Error(err)
		return
	}

	// Parse ifb stats
	// Retrieve ifb statistics from passed string
	// NOTE: we have to read the ifbStats from the back since based on the results are always at
	//       the end but the characteristic may be different (no pkt loss, no normal distribution, etc)
	ifbStats := strings.Split(out, " ")
	ifbStats := strings.Split(ifbStatsStr, " ")
	var curRxPkt int
	var curRxPktDrop int
	var curRxBytes int
	if len(ifbStats) > 20 {
		curRxPkt, _ = strconv.Atoi(ifbStats[len(ifbStats)-15])
		curRxPktDrop, _ = strconv.Atoi(ifbStats[len(ifbStats)-12][:len(ifbStats[len(ifbStats)-12])-1])
		curRxBytes, _ = strconv.Atoi(ifbStats[len(ifbStats)-17])
	if len(ifbStats) >= 13 {
		curRxPkt, _ = strconv.Atoi(ifbStats[len(ifbStats)-9])
		curRxPktDrop, _ = strconv.Atoi(ifbStats[len(ifbStats)-6][:len(ifbStats[len(ifbStats)-6])-1])
		curRxBytes, _ = strconv.Atoi(ifbStats[len(ifbStats)-11])
	} else {
		log.Error("Error in the ifb statistics output: ", ifbStats)
	}
@@ -277,7 +257,8 @@ func (u *destination) logRxTx() {
	semLatencyMap.Unlock()
	metric.UlTput = tput
	metric.UlLoss = loss
	err = metricStore.SetCachedNetworkMetric(metric)

	err := metricStore.SetCachedNetworkMetric(metric)
	if err != nil {
		log.Error("Failed to set network metric")
	}
+67 −11
Original line number Diff line number Diff line
@@ -558,9 +558,10 @@ func workLatency() {
			go func(u *destination, i int) {
				u.ping(pinger)
			}(u, i)
			go func(u *destination, i int) {
			//go func(u *destination, i int) {
			u.compute()
			}(u, i)
			//}(u, i)

		}
		semOptsDests.Unlock()

@@ -574,11 +575,39 @@ func workRxTxPackets() {

		semOptsDests.Lock()

		for i, u := range opts.dests {
		str := "tc -s qdisc show"
		out, err := cmdExec(str)
		if err != nil {
			log.Error("tc -s qdisc show")
			log.Error(err)
			return
		}
		//split line by line
		lineStrings := strings.Split(out, "\n")

		//store the mapping
		qdiscResults := make(map[string]string)

		lineIndex := 0
		for lineIndex < (len(lineStrings) - 1) {
			//each entry has 3 lines
			//first line get the ifb
			line1 := lineStrings[lineIndex]
			//second line are the stats we need
			line2 := lineStrings[lineIndex+1]
			//third line is not useful stats for our application
			//line3 := lineStrings[lineIndex+2]
			ifb := strings.Split(line1, " ")
			//store the mapping
			qdiscResults[ifb[4]] = line2
			lineIndex = lineIndex + 3
		}

		for _, u := range opts.dests {
			//get the data for all, parse the output and transmit to each
			//starting 1 thread for getting the rx-tx info and computing the appropriate metrics
			go func(u *destination, i int) {
				u.processRxTx()
			}(u, i)
			/*go*/
			u.processRxTx(qdiscResults["ifb"+u.ifbNumber])
		}
		semOptsDests.Unlock()

@@ -592,11 +621,38 @@ func workLogRxTxData() {

		semOptsDests.Lock()

		for i, u := range opts.dests {
		str := "tc -s qdisc show"
		out, err := cmdExec(str)
		if err != nil {
			log.Error("tc -s qdisc show")
			log.Error(err)
			return
		}
		//split line by line
		lineStrings := strings.Split(out, "\n")

		//store the mapping
		qdiscResults := make(map[string]string)

		lineIndex := 0
		for lineIndex < (len(lineStrings) - 1) {
			//each entry has 3 lines
			//first line get the ifb
			line1 := lineStrings[lineIndex]
			//second line are the stats we need
			line2 := lineStrings[lineIndex+1]
			//third line is not useful stats for our application
			//line3 := lineStrings[lineIndex+2]
			ifb := strings.Split(line1, " ")
			//store the mapping
			qdiscResults[ifb[4]] = line2
			lineIndex = lineIndex + 3
		}

		for _, u := range opts.dests {
			//starting 1 thread for getting the rx-tx info and computing the appropriate metrics
			go func(u *destination, i int) {
				u.logRxTx()
			}(u, i)
			/*go*/
			u.logRxTx(qdiscResults["ifb"+u.ifbNumber])
		}
		semOptsDests.Unlock()

Loading