Unverified Commit 3e2eac25 authored by Kevin Di Lallo's avatar Kevin Di Lallo Committed by GitHub
Browse files

Merge pull request #57 from pastorsx/sp_dev_concur_map_sidecar

Fix for concurrent map access crash in TC sidecar
parents d4233489 0526d1a6
Loading
Loading
Loading
Loading
+6 −2
Original line number Diff line number Diff line
@@ -145,9 +145,9 @@ func (u *destination) compute() (st stat) {

	//string for mapping src:dest
	mapName := u.hostName + ":" + u.remoteName
	semLatencyMap <- 1
	semLatencyMap.Lock()
	latestLatencyResultsMap[mapName] = lat
	<-semLatencyMap
	semLatencyMap.Unlock()

	// Log measurment
	log.WithFields(log.Fields{
@@ -273,7 +273,11 @@ func (u *destination) logRxTx() {
	}
	// Store latency metric
	srcDest := u.hostName + ":" + u.remoteName

	semLatencyMap.Lock()
	err = metricStore.SetLatencyMetric(u.hostName, u.remoteName, latestLatencyResultsMap[srcDest])
	semLatencyMap.Unlock()

	if err != nil {
		log.Error("Failed to set latency metric")
	}
+16 −7
Original line number Diff line number Diff line
@@ -23,6 +23,7 @@ import (
	"os"
	"os/exec"
	"strings"
	"sync"
	"time"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
@@ -69,8 +70,8 @@ type podShortElement struct {
	IfbNumber string
}

var sem = make(chan int, 1)
var semLatencyMap = make(chan int, 1)
var semOptsDests sync.Mutex
var semLatencyMap sync.Mutex

var opts = struct {
	timeout         time.Duration
@@ -110,7 +111,7 @@ var serviceChains = map[string]string{}
var ifbs = map[string]string{}
var filters = map[string]string{}
var netcharMap = map[string]*NetChar{}
var latestLatencyResultsMap = map[string]int32{}
var latestLatencyResultsMap map[string]int32

var measurementsRunning = false
var flushRequired = false
@@ -204,6 +205,10 @@ func initMeepSidecar() error {

	log.Info("Successfully subscribed to Pub/Sub events")

	semLatencyMap.Lock()
	latestLatencyResultsMap = make(map[string]int32)
	semLatencyMap.Unlock()

	return nil
}

@@ -526,7 +531,9 @@ func callPing() {
				},
			}

			semOptsDests.Lock()
			opts.dests = append(opts.dests, &dst)
			semOptsDests.Unlock()
		}
	}

@@ -545,6 +552,7 @@ func callPing() {
func workLatency() {
	for {

		semOptsDests.Lock()
		for i, u := range opts.dests {
			//starting 2 threads, one for the pings, one for the computing part
			go func(u *destination, i int) {
@@ -554,6 +562,7 @@ func workLatency() {
				u.compute()
			}(u, i)
		}
		semOptsDests.Unlock()

		time.Sleep(opts.interval)
	}
@@ -563,7 +572,7 @@ func workRxTxPackets() {
	for {
		//only this one affects the destinations based on info in the DB

		sem <- 1
		semOptsDests.Lock()

		for i, u := range opts.dests {
			//starting 1 thread for getting the rx-tx info and computing the appropriate metrics
@@ -571,7 +580,7 @@ func workRxTxPackets() {
				u.processRxTx()
			}(u, i)
		}
		<-sem
		semOptsDests.Unlock()

		time.Sleep(opts.trafficInterval)
	}
@@ -581,7 +590,7 @@ func workLogRxTxData() {
	for {
		//only this one affects the destinations based on info in the DB

		sem <- 1
		semOptsDests.Lock()

		for i, u := range opts.dests {
			//starting 1 thread for getting the rx-tx info and computing the appropriate metrics
@@ -589,7 +598,7 @@ func workLogRxTxData() {
				u.logRxTx()
			}(u, i)
		}
		<-sem
		semOptsDests.Unlock()

		time.Sleep(opts.interval)
	}