Commit 6ff758f2 authored by Kevin Di Lallo's avatar Kevin Di Lallo
Browse files

tc-sidecar updates complete + bug fixes

parent c7a5b508
Loading
Loading
Loading
Loading
+31 −11
Original line number Diff line number Diff line
@@ -43,7 +43,7 @@ type IpManager struct {
	clientset   *kubernetes.Clientset
	updateCb    IpAddrUpdateCb
	isConnected bool
	mutex       *sync.Mutex
	mutex       sync.Mutex
}

// NewIpManager - Creates and initialize an IP Manager instance
@@ -62,16 +62,43 @@ func NewIpManager(name string, sandboxName string, updateCb IpAddrUpdateCb) (im
}

// SetPodList - Set list of pods to monitor IP addresses for
func (im *IpManager) SetPodList(podList []string) {
func (im *IpManager) SetPodList(podList map[string]bool) {
	im.mutex.Lock()
	defer im.mutex.Unlock()

	// Remove stale entries
	for podName := range im.podIpMap {
		if _, found := podList[podName]; !found {
			delete(im.podIpMap, podName)
		}
	}

	// Add missing entries
	for podName := range podList {
		if _, found := im.podIpMap[podName]; !found {
			im.podIpMap[podName] = IP_ADDR_NONE
		}
	}
}

// SetSvcList - Set list of services to monitor IP addresses for
func (im *IpManager) SetSvcList(svcList []string) {
func (im *IpManager) SetSvcList(svcList map[string]bool) {
	im.mutex.Lock()
	defer im.mutex.Unlock()

	// Remove stale entries
	for svcName := range im.svcIpMap {
		if _, found := svcList[svcName]; !found {
			delete(im.svcIpMap, svcName)
		}
	}

	// Add missing entries
	for svcName := range svcList {
		if _, found := im.svcIpMap[svcName]; !found {
			im.svcIpMap[svcName] = IP_ADDR_NONE
		}
	}
}

// GetPodIp - Get Pod IP address
@@ -85,7 +112,7 @@ func (im *IpManager) GetPodIp(podName string) string {

// GetPodIp - Get Svc IP address
func (im *IpManager) GetSvcIp(svcName string) string {
	svcIp, found := im.podIpMap[svcName]
	svcIp, found := im.svcIpMap[svcName]
	if !found {
		svcIp = IP_ADDR_NONE
	}
@@ -171,13 +198,6 @@ func (im *IpManager) refreshIpAddresses() {
			log.Debug("Setting podName: ", podName, " ip: ", podIp)
			im.podIpMap[podName] = podIp
			updated = true

			// //set the element if it has already been created by the scenario parsing
			// element := netElemMap[podName]
			// if element != nil {
			// 	element.Ip = podIp
			// 	element.NextUniqueNumber = 1
			// }
		}
	}

+4 −4
Original line number Diff line number Diff line
@@ -88,7 +88,7 @@ func (re *RoutingEngine) RefreshLbRules() {
	for _, netElem := range netElemList.NetworkElements {
		podInfo := podInfoMap[netElem.Name]
		if podInfo == nil {
			log.Error("Failed to find network element")
			log.Error("Failed to find network element: ", netElem.Name)
			continue
		}

@@ -99,7 +99,7 @@ func (re *RoutingEngine) RefreshLbRules() {
	}

	// Apply new MG Service mapping rules
	re.applyMgSvcMapping()
	re.applyLbRules()

	// Inform sidecars of LB rule updates
	re.publishLbRulesUpdate()
@@ -118,8 +118,8 @@ func (re *RoutingEngine) publishLbRulesUpdate() {
}

// Generate & store rules based on mapping
func (re *RoutingEngine) applyMgSvcMapping() {
	log.Debug("applyMgSvcMapping")
func (re *RoutingEngine) applyLbRules() {
	log.Debug("applyLbRules")

	keys := map[string]bool{}

+46 −30
Original line number Diff line number Diff line
@@ -60,15 +60,11 @@ const THROUGHPUT_UNIT = 1000000 //convert from Mbps to bps
const DEFAULT_NET_CHAR_DB = 0
const redisAddr string = "meep-redis-master.default.svc.cluster.local:6379"

// NetElem -
// NextUniqueNumber is reserving 2 spaces for each unique number to apply
// changes starting with odd number and using even number to apply the 1st
// change and come back on the odd number for the next update to apply
// Network Element
type NetElem struct {
	Name          string
	FilterInfoMap map[string]*FilterInfo
	Ip            string
	NextUniqueNumber int
}

// FilterInfo -
@@ -204,6 +200,7 @@ func Init() (err error) {
		log.Error("Failed to create model: ", err.Error())
		return err
	}
	log.Info("Active scenario model created")

	// Open Network Characteristics Store
	tce.netCharStore = new(NetCharStore)
@@ -224,9 +221,8 @@ func Init() (err error) {
		log.Error("Failed to create a netChar object. Error: ", err)
		return err
	}

	// Configure & Start Net Char Manager
	tce.netCharMgr.Register(netCharUpdate, updateComplete)
	log.Info("Network Characteristics Manager instance created")

	// Create new IP Manager instance
	tce.ipManager, err = NewIpManager(moduleName, tce.sandboxName, ipAddrUpdated)
@@ -234,6 +230,7 @@ func Init() (err error) {
		log.Error("Failed to create IP Manager. Error: ", err)
		return err
	}
	log.Info("IP Manager instance created")

	// Create new Routing Engine instance
	tce.routingEngine, err = NewRoutingEngine(moduleName, tce.sandboxName)
@@ -241,6 +238,7 @@ func Init() (err error) {
		log.Error("Failed to create Routing Engine. Error: ", err)
		return err
	}
	log.Info("Routing Engine instance created")

	// Process scenario in case it is already active
	processScenarioActivate()
@@ -290,6 +288,13 @@ func processScenarioActivate() {
	// Sync with active scenario store
	tce.activeModel.UpdateScenario()

	// Make sure scenario is active
	scenarioName := tce.activeModel.GetScenarioName()
	if scenarioName == "" {
		log.Warn("Scenario not active")
		return
	}

	// Process new scenario
	err := processScenario(tce.activeModel)
	if err != nil {
@@ -320,6 +325,13 @@ func processScenarioUpdate(eventType string) {
	// Sync with active scenario store
	tce.activeModel.UpdateScenario()

	// Make sure scenario is active
	scenarioName := tce.activeModel.GetScenarioName()
	if scenarioName == "" {
		log.Warn("Scenario not active")
		return
	}

	// Process updated scenario
	err := processScenario(tce.activeModel)
	if err != nil {
@@ -362,8 +374,8 @@ func stopScenario() {
	mgSvcInfoMap = make(map[string]*MgServiceInfo)
	podInfoMap = make(map[string]*PodInfo)

	tce.ipManager.SetPodList([]string{})
	tce.ipManager.SetSvcList([]string{})
	tce.ipManager.SetPodList(map[string]bool{})
	tce.ipManager.SetSvcList(map[string]bool{})

	tce.netCharStore.rc.DBFlush(tce.netCharStore.baseKey)

@@ -424,7 +436,6 @@ func processScenario(model *mod.Model) error {
		if element == nil {
			element = new(NetElem)
			element.Name = proc.Name
			element.NextUniqueNumber = 1
			element.Ip = tce.ipManager.GetPodIp(proc.Name)
			element.FilterInfoMap = make(map[string]*FilterInfo)
			netElemMap[proc.Name] = element
@@ -493,8 +504,8 @@ func processScenario(model *mod.Model) error {
	}

	// Update Pod & Svc lists in IP Manager
	tce.ipManager.SetPodList(getKeys(podNames))
	tce.ipManager.SetSvcList(getKeys(svcNames))
	tce.ipManager.SetPodList(podNames)
	tce.ipManager.SetSvcList(svcNames)

	// Remove network elements that are no longer in scenario
	for procName := range netElemMap {
@@ -521,16 +532,6 @@ func ipAddrUpdated() {
	tce.routingEngine.RefreshLbRules()
}

func getKeys(m map[string]bool) []string {
	keys := make([]string, len(m))
	i := 0
	for k := range m {
		keys[i] = k
		i++
	}
	return keys
}

// Create & store new service & MG service information
func addServiceInfo(svcName string, svcPorts []dataModel.ServicePort, mgSvcName string, nodeName string, svcNames map[string]bool) {
	// Add to service list
@@ -554,7 +555,7 @@ func addServiceInfo(svcName string, svcPorts []dataModel.ServicePort, mgSvcName
	// Store MG Service info, if any
	if mgSvcName != "" {
		// Add to service list
		svcNames[svcName] = true
		svcNames[mgSvcName] = true

		// Add MG service to MG service info map if it does not exist yet
		mgSvcInfo, found := mgSvcInfoMap[mgSvcName]
@@ -673,16 +674,14 @@ func refreshNcRules() {
				filterInfo.SrcNetmask = "0"
				filterInfo.SrcPort = 0
				filterInfo.DstPort = 0
				filterInfo.UniqueNumber = dstElem.NextUniqueNumber
				filterInfo.UniqueNumber = getUniqueFilterNumber(dstElem)
				filterInfo.Latency = 0
				filterInfo.LatencyVariation = 0
				filterInfo.LatencyCorrelation = COMMON_CORRELATION
				filterInfo.Distribution = DEFAULT_DISTRIBUTION
				filterInfo.PacketLoss = 0.0
				filterInfo.DataRate = 0

				dstElem.FilterInfoMap[srcElem.Name] = filterInfo
				dstElem.NextUniqueNumber++
			}
		}

@@ -729,7 +728,7 @@ func applyNcRules() {
	}

	// Remove stale DB entries
	keyName := tce.netCharStore.baseKey + typeLb + ":*"
	keyName := tce.netCharStore.baseKey + typeNet + ":*"
	err := tce.netCharStore.rc.ForEachEntry(keyName, removeNcEntryHandler, &keys)
	if err != nil {
		log.Error("Failed to remove stale entries with err: ", err)
@@ -788,6 +787,23 @@ func setFilterRule(filterInfo *FilterInfo) (keyName string, err error) {
	return keyName, nil
}

func getUniqueFilterNumber(elem *NetElem) int {
	maxNum := 1000
	for num := 1; num < maxNum; num++ {
		isUnique := true
		for _, filter := range elem.FilterInfoMap {
			if num == filter.UniqueNumber {
				isUnique = false
				break
			}
		}
		if isUnique {
			return num
		}
	}
	return maxNum
}

// Used to print all the element information belonging to an NetElem object -- uncomment to use -- for debug purpose
// func printfElement(element NetElem) {
//      log.Debug("element name : ", element.Name)
+216 −166
Original line number Diff line number Diff line
@@ -41,6 +41,9 @@ import (

const moduleName string = "meep-tc-sidecar"

const redisAddr string = "meep-redis-master.default.svc.cluster.local:6379"
const influxDBAddr string = "http://meep-influxdb.default.svc.cluster.local:8086"

const tcEngineKey string = "tc-engine:"
const metricsKey string = "metrics:"

@@ -59,7 +62,8 @@ const meSvcChain string = mePrefix + "SERVICES"
const ingressSvcChain string = ingressPrefix + "SERVICES"
const egressSvcChain string = egressPrefix + "SERVICES"
const maxChainLen int = 25
const capLetters = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
const capLetters string = "ABCDEFGHIJKLMNOPQRSTUVWXYZ"
const ipAddrNone string = "n/a"

const fieldSvcType string = "svc-type"
const fieldSvcName string = "svc-name"
@@ -69,16 +73,28 @@ const fieldSvcPort string = "svc-port"
const fieldLbSvcIp string = "lb-svc-ip"
const fieldLbSvcPort string = "lb-svc-port"

type podShortElement struct {
const DEFAULT_SIDECAR_DB = 0

type DestElement struct {
	name      string
	ipAddr    string
	IfbNumber string
}

var semOptsDests sync.Mutex
var semLatencyMap sync.Mutex
type SrcIps struct {
	PodIp string
	SvcIp string
}

type NetChar struct {
	Latency      string
	Jitter       string
	PacketLoss   string
	Throughput   string
	Distribution string
}

var opts = struct {
type Opts struct {
	timeout         time.Duration
	interval        time.Duration
	trafficInterval time.Duration
@@ -88,25 +104,11 @@ var opts = struct {
	bind6           string
	dests           []*destination
	resolverTimeout time.Duration
}{
	timeout:         100000 * time.Millisecond,
	interval:        1000 * time.Millisecond,
	trafficInterval: 100 * time.Millisecond,
	bind4:           "0.0.0.0",
	bind6:           "::",
	payloadSize:     56,
	statBufferSize:  50,
	resolverTimeout: 15000 * time.Millisecond,
}

// NetChar
type NetChar struct {
	Latency      string
	Jitter       string
	PacketLoss   string
	Throughput   string
	Distribution string
}
// Variables
var semOptsDests sync.Mutex
var semLatencyMap sync.Mutex

var pinger *Pinger
var PodName string
@@ -116,16 +118,11 @@ var ipTbl *ipt.IPTables
var letters = []rune(capLetters)
var serviceChains = map[string]string{}
var ifbs = map[string]string{}
var filters = map[string]string{}
var filters = map[string]*SrcIps{}
var netcharMap = map[string]*NetChar{}
var latestLatencyResultsMap map[string]int32

var measurementsRunning = false
var flushRequired = false
var firstTimePass = true

const redisAddr = "meep-redis-master.default.svc.cluster.local:6379"
const influxDBAddr = "http://meep-influxdb.default.svc.cluster.local:8086"

var mqLocal *mq.MsgQueue
var handlerId int
@@ -133,11 +130,19 @@ var rc *redis.Connector
var metricStore *met.MetricStore
var baseKey string
var metricsBaseKey string

const DEFAULT_SIDECAR_DB = 0

var nbAppliedOperations = 0

var opts Opts = Opts{
	timeout:         100000 * time.Millisecond,
	interval:        1000 * time.Millisecond,
	trafficInterval: 100 * time.Millisecond,
	bind4:           "0.0.0.0",
	bind6:           "::",
	payloadSize:     56,
	statBufferSize:  50,
	resolverTimeout: 15000 * time.Millisecond,
}

func init() {
	// Log as JSON instead of the default ASCII formatter.
	//log.MeepJSONLogInit("meep-tc-sidecar")
@@ -249,21 +254,40 @@ func initMeepSidecar() (err error) {
	// Connect to Metric Store
	metricStore, err = met.NewMetricStore(scenarioName, sandboxName, influxDBAddr, redisAddr)
	if err != nil {
		log.Error("Failed connection to Redis: ", err)
		log.Error("Failed connection to Redis. Error: ", err)
		return err
	}

	semLatencyMap.Lock()
	// Create & initialize pinger instance
	pinger, err = New(opts.bind4, opts.bind6)
	if err != nil {
		log.Error("Failed to create Pinger. Error: ", err)
		return err
	}
	if pinger.PayloadSize() != uint16(opts.payloadSize) {
		pinger.SetPayloadSize(uint16(opts.payloadSize))
	}

	// Initialize filters
	err = initializeFilters()
	if err != nil {
		log.Error("Failed to initialize filters. Error: ", err)
		return err
	}

	// Initialize latency results
	latestLatencyResultsMap = make(map[string]int32)
	semLatencyMap.Unlock()

	// Refresh Ping destinations
	refreshPingDests()

	return nil
}

// runMeepSidecar - Start TC Sidecar
func runMeepSidecar() (err error) {
	// Refresh TC rules to match DB state
	refreshNetCharRules()
	// Refresh NC rules to match DB state
	refreshNcRules()

	// Refresh LB IPtables rules to match DB state
	refreshLbRules()
@@ -276,6 +300,11 @@ func runMeepSidecar() (err error) {
		return err
	}

	// Start measurements
	go workLatency()
	go workRxTxPackets()
	go workLogRxTxData()

	return nil
}

@@ -287,34 +316,27 @@ func msgHandler(msg *mq.Msg, userData interface{}) {
		refreshLbRules()
	case mq.MsgTcNetRulesUpdate:
		log.Debug("RX MSG: ", mq.PrintMsg(msg))
		refreshNetCharRules()
		refreshNcRules()
	default:
		log.Trace("Ignoring unsupported message: ", mq.PrintMsg(msg))
	}
}

func refreshNetCharRules() {
	// Create shape rules
	_ = initializeOnFirstPass()

	currentTime := time.Now()
func refreshNcRules() {
	nbAppliedOperations = 0
	currentTime := time.Now()

	// Update Shaping & filter rules
	_ = createIfbs()

	_ = createFilters()

	// Delete unused filters
	deleteUnusedFilters()

	// Delete unused ifbs
	deleteUnusedIfbs()

	elapsed := time.Since(currentTime)
	log.Debug("RefreshNetCharRules execution time for ", nbAppliedOperations, " updates, elapsed time: ", elapsed)

	// Start measurements
	startMeasurementThreads()
	// Refresh ping destinations
	refreshPingDests()
}

func refreshLbRules() {
@@ -483,6 +505,12 @@ func refreshLbRulesHandler(key string, fields map[string]string, userData interf
		"-j", "DNAT", "--to-destination", fields[fieldLbSvcIp]+":"+fields[fieldLbSvcPort],
		"-m", "comment", "--comment", service)

	// Ignore rules with missing IP addresses
	if fields[fieldSvcIp] == ipAddrNone || fields[fieldLbSvcIp] == ipAddrNone {
		log.Debug("Missing IP address for service: ", service)
		return nil
	}

	// Retrieve service chain name if service exists
	serviceChain, exists := serviceChains[service]
	if exists {
@@ -536,25 +564,22 @@ func refreshLbRulesHandler(key string, fields map[string]string, userData interf
	return nil
}

func startMeasurementThreads() {
	// Only start measurements if not already running
	if len(ifbs) != 0 && !measurementsRunning {
		// Populate opts.dests used by all
		callPing()
		go workLatency()
		go workRxTxPackets()
		go workLogRxTxData()
		measurementsRunning = true
	}
// refreshPingDests - Refresh ping destinations to match valid DB entries
func refreshPingDests() {
	// Get list of destinations with valid IP addresses
	var pingDests []DestElement
	keyName := baseKey + typeNet + ":" + PodName + ":filter*"
	err := rc.ForEachEntry(keyName, refreshPingDestsHandler, &pingDests)
	if err != nil {
		log.Error("Failed to update dest pod list. Error: ", err)
	}

func callPing() {
	podsToPing, _ := createPing()

	for _, pod := range podsToPing {
		remotes, err := resolve(pod.ipAddr, opts.resolverTimeout)
	// Create new dest list
	dests := []*destination{}
	for _, pingDest := range pingDests {
		remotes, err := resolve(pingDest.ipAddr, opts.resolverTimeout)
		if err != nil {
			log.Debug("error resolving host ", pod.name, "(", pod.ipAddr, ") err: ", err)
			log.Debug("error resolving host ", pingDest.name, "(", pingDest.ipAddr, ") err: ", err)
			continue
		}

@@ -564,13 +589,13 @@ func callPing() {
			}

			ipaddr := remote // need to create a copy
			name := pod.name
			dst := destination{
				host:       pod.ipAddr,
			name := pingDest.name
			dest := destination{
				host:       pingDest.ipAddr,
				hostName:   PodName,
				remote:     &ipaddr,
				remoteName: name,
				ifbNumber:  pod.IfbNumber,
				ifbNumber:  pingDest.IfbNumber,
				history: &history{
					results: make([]time.Duration, opts.statBufferSize),
				},
@@ -581,41 +606,45 @@ func callPing() {
					rxBytes: 0,
				},
			}
			dests = append(dests, &dest)
		}
	}

	// Update ping dest list
	semOptsDests.Lock()
			opts.dests = append(opts.dests, &dst)
	opts.dests = dests
	semOptsDests.Unlock()
}
	}

	//get a pinger instance
	if instance, err := New(opts.bind4, opts.bind6); err == nil {
		if instance.PayloadSize() != uint16(opts.payloadSize) {
			instance.SetPayloadSize(uint16(opts.payloadSize))
		}
		pinger = instance
		//defer pinger.Close()
	} else {
		panic(err)
func refreshPingDestsHandler(key string, fields map[string]string, userData interface{}) error {
	pingDests := userData.(*[]DestElement)
	var dest DestElement
	dest.name = fields["srcName"]
	dest.ipAddr = fields["srcIp"]
	dest.IfbNumber = fields["ifb_uniqueId"]

	// Append valid pods only
	if dest.ipAddr != ipAddrNone {
		*pingDests = append(*pingDests, dest)
	}
	return nil
}

func workLatency() {
	for {

		semOptsDests.Lock()
		for i, u := range opts.dests {
			//starting 2 threads, one for the pings, one for the computing part
			go func(u *destination, i int) {
				u.ping(pinger)
			}(u, i)
			//go func(u *destination, i int) {
			u.compute()
			//}(u, i)
		for i, dest := range opts.dests {
			// Send ping in a separate thread
			go func(dest *destination, i int) {
				dest.ping(pinger)
			}(dest, i)

			// Compute latest latency results for destination
			dest.compute()
		}
		semOptsDests.Unlock()

		// Wait before sending next set of pings
		time.Sleep(opts.interval)
	}
}
@@ -623,7 +652,6 @@ func workLatency() {
func workRxTxPackets() {
	for {
		// only this one affects the destinations based on info in the DB

		semOptsDests.Lock()

		str := "tc -s qdisc show"
@@ -631,6 +659,7 @@ func workRxTxPackets() {
		if err != nil {
			log.Error("tc -s qdisc show")
			log.Error(err)
			semOptsDests.Unlock()
			return
		}
		//split line by line
@@ -657,11 +686,9 @@ func workRxTxPackets() {
		// Store throughput metric if entry exists
		var tputStats = make(map[string]interface{})

		for _, u := range opts.dests {
			//get the data for all, parse the output and transmit to each
			//starting 1 thread for getting the rx-tx info and computing the appropriate metrics
			/*go*/
			tputStats[u.remoteName] = u.processRxTx(qdiscResults["ifb"+u.ifbNumber])
		// Get throughput metrics for each dest
		for _, dest := range opts.dests {
			tputStats[dest.remoteName] = dest.processRxTx(qdiscResults["ifb"+dest.ifbNumber])
		}

		key := metricsBaseKey + PodName + ":throughput"
@@ -670,6 +697,7 @@ func workRxTxPackets() {
		}
		semOptsDests.Unlock()

		// Wait before re-evaluating traffic stats
		time.Sleep(opts.trafficInterval)
	}
}
@@ -677,7 +705,6 @@ func workRxTxPackets() {
func workLogRxTxData() {
	for {
		// only this one affects the destinations based on info in the DB

		semOptsDests.Lock()

		str := "tc -s qdisc show"
@@ -685,6 +712,7 @@ func workLogRxTxData() {
		if err != nil {
			log.Error("tc -s qdisc show")
			log.Error(err)
			semOptsDests.Unlock()
			return
		}
		//split line by line
@@ -708,39 +736,17 @@ func workLogRxTxData() {
			lineIndex = lineIndex + 3
		}

		for _, u := range opts.dests {
			//starting 1 thread for getting the rx-tx info and computing the appropriate metrics
			/*go*/
			u.logRxTx(qdiscResults["ifb"+u.ifbNumber])
		// Get NC metrics for each dest
		for _, dest := range opts.dests {
			dest.logRxTx(qdiscResults["ifb"+dest.ifbNumber])
		}
		semOptsDests.Unlock()

		// Wait before re-evaluating traffic stats
		time.Sleep(opts.interval)
	}
}

func createPing() ([]podShortElement, error) {
	var podsToPing []podShortElement
	keyName := baseKey + typeNet + ":" + PodName + ":filter*"
	err := rc.ForEachEntry(keyName, createPingHandler, &podsToPing)
	if err != nil {
		return nil, err
	}
	return podsToPing, nil
}

func createPingHandler(key string, fields map[string]string, userData interface{}) error {
	podsToPing := userData.(*[]podShortElement)
	var pod podShortElement
	pod.name = fields["srcName"]
	pod.ipAddr = fields["srcIp"]
	pod.IfbNumber = fields["ifb_uniqueId"]

	*podsToPing = append(*podsToPing, pod)

	return nil
}

func createIfbs() error {
	keyName := baseKey + typeNet + ":" + PodName + ":shape*"
	err := rc.ForEachEntry(keyName, createIfbsHandler, nil)
@@ -775,50 +781,100 @@ func createFilters() error {

func createFiltersHandler(key string, fields map[string]string, userData interface{}) error {
	filterNumber := fields["filter_uniqueId"]
	_, exists := filters[filterNumber]
	ifbNumber := fields["ifb_uniqueId"]
	srcIp := fields["srcIp"]
	srcSvcIp := fields["srcSvcIp"]

	if !exists {
	// Compare with previous filters to determine required action
	podFilterRequired := false
	svcFilterRequired := false

		ipSrc := fields["srcIp"]
		ipSvcSrc := fields["srcSvcIp"]
		//              srcName := fields["srcName"]
		ifbNumber := fields["ifb_uniqueId"]
	prevIps, found := filters[filterNumber]
	if !found {
		// New - only create filters if pod IP is valid
		if srcIp != ipAddrNone {
			podFilterRequired = true
			if srcSvcIp != ipAddrNone {
				svcFilterRequired = true
			}
		}
	} else {
		// Updated - only handle cases where IPs have changed
		if srcIp != prevIps.PodIp && srcSvcIp == prevIps.SvcIp {
			if srcIp == ipAddrNone {
				// Filters can only exist if pod IP is valid
				_ = cmdDeleteFilter(filterNumber)
				delete(filters, filterNumber)
				log.Debug("Filter removed: ", filterNumber, " ifb: ", ifbNumber)
			} else {
				// Remove old filters if pod or svc IP has changed
				podIpChanged := prevIps.PodIp != ipAddrNone && srcIp != prevIps.PodIp
				svcIpChanged := prevIps.SvcIp != ipAddrNone && srcSvcIp != prevIps.SvcIp
				if podIpChanged || svcIpChanged {
					_ = cmdDeleteFilter(filterNumber)
					delete(filters, filterNumber)
					podFilterRequired = true
					log.Debug("Filter removed for update: ", filterNumber, " ifb: ", ifbNumber)
				}

		err := cmdCreateFilter(filterNumber, ifbNumber, ipSrc)
		if err == nil {
				// Create svc filter if necessary
				if srcSvcIp != ipAddrNone {
					svcFilterRequired = true
				}
			}
		}
	}

			if ipSvcSrc != "" {
				err = cmdCreateFilter(filterNumber, ifbNumber, ipSvcSrc)
	// Create filters && update filter map if necessary
	if podFilterRequired || svcFilterRequired {
		if podFilterRequired {
			err := cmdCreateFilter(filterNumber, ifbNumber, srcIp)
			if err != nil {
				log.Error("Failed to create filter with error: ", err.Error())
				return nil
			}
			log.Debug("Filter created: ", filterNumber, " ifb: ", ifbNumber)
		}
		if err == nil {
			filters[filterNumber] = filterNumber
		if svcFilterRequired {
			err := cmdCreateFilter(filterNumber, ifbNumber, srcSvcIp)
			if err != nil {
				log.Error("Failed to create filter with error: ", err.Error())
				_ = cmdDeleteFilter(filterNumber)
				delete(filters, filterNumber)
				return nil
			}
			log.Debug("Filter created: ", filterNumber, " ifb: ", ifbNumber)
		}

		srcIps := new(SrcIps)
		srcIps.PodIp = srcIp
		srcIps.SvcIp = srcSvcIp
		filters[filterNumber] = srcIps
	}

	return nil
}

func deleteUnusedFilters() {
	for index, filterNumber := range filters {
	for filterNumber := range filters {
		keyName := baseKey + typeNet + ":" + PodName + ":filter:" + filterNumber
		if !rc.EntryExists(keyName) {
			log.Debug("filter removed: ", filterNumber)
			// Remove old filter
			_ = cmdDeleteFilter(filterNumber)
			delete(filters, index)
			delete(filters, filterNumber)
		}
	}
}

func deleteUnusedIfbs() {
	for index, ifbNumber := range ifbs {
	for ifbNumber := range ifbs {
		keyName := baseKey + typeNet + ":" + PodName + ":shape:" + ifbNumber
		if !rc.EntryExists(keyName) {
			log.Debug("ifb removed: ", ifbNumber)
			// Remove associated Ifb
			_ = cmdDeleteIfb(ifbNumber)
			delete(ifbs, index)
			delete(ifbs, ifbNumber)
		}
	}
}
@@ -960,33 +1016,27 @@ func cmdDeleteFilter(filterNumber string) error {
	return nil
}

func initializeOnFirstPass() error {

	if firstTimePass {
		nbAppliedOperations++
func initializeFilters() error {
	_, err := cmdExec("tc qdisc replace dev eth0 root handle 1: netem")
	if err != nil {
		log.Info("Error: ", err)
		return err
	}
		nbAppliedOperations++
	_, err = cmdExec("tc qdisc replace dev eth0 handle ffff: ingress")
	if err != nil {
		log.Info("Error: ", err)
		return err
	}
		firstTimePass = false
	}
	return nil
}

func cmdCreateFilter(filterNumber string, ifbNumber string, ipSrc string) error {
func cmdCreateFilter(filterNumber string, ifbNumber string, srcIp string) error {

	//"tc filter add dev eth0 parent ffff: protocol ip prio $filterNumber u32 match ip src $ipsrc match u32 0 0 action mirred egress redirect dev $ifb$ifbnumber"
	str := "tc filter add dev eth0 parent ffff: protocol ip prio " + filterNumber + " u32 match ip src " + ipSrc + " match u32 0 0 action mirred egress redirect dev ifb" + ifbNumber
	//"tc filter add dev eth0 parent ffff: protocol ip prio $filterNumber u32 match ip src $srcIp match u32 0 0 action mirred egress redirect dev $ifb$ifbnumber"
	str := "tc filter add dev eth0 parent ffff: protocol ip prio " + filterNumber + " u32 match ip src " + srcIp + " match u32 0 0 action mirred egress redirect dev ifb" + ifbNumber

	//fonction must be a replace... a replace Adds if not there or replace if existing
	//"tc filter replace dev eth0 parent ffff: protocol ip prio $filterNumber u32 match ip src $ipsrc match u32 0 0 action mirred egress redirect dev $ifb$ifbnumber"
	//"tc filter replace dev eth0 parent ffff: protocol ip prio $filterNumber u32 match ip src $srcIp match u32 0 0 action mirred egress redirect dev $ifb$ifbnumber"
	//str := "tc filter replace dev eth0 parent ffff: protocol ip prio " + filterNumber + " handle 800::800 u32 match u32 0 0 action mirred egress redirect dev ifb" + ifbNumber
	nbAppliedOperations++
	_, err := cmdExec(str)