Loading go-apps/meep-mon-engine/server/mon-engine.go +99 −4 Original line number Diff line number Diff line Loading @@ -22,6 +22,8 @@ import ( "net/http" "os" "strings" "sync" "time" dkm "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-data-key-mgr" log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger" Loading @@ -30,6 +32,8 @@ import ( sbs "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-sandbox-store" v1 "k8s.io/api/core/v1" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" Loading Loading @@ -63,6 +67,12 @@ type MonEngineInfo struct { StartTime string } type Sandbox struct { Releases map[string]bool StartTime time.Time Running bool } const serviceName = "Monitoring Engine" const moduleName = "meep-mon-engine" const moduleNamespace = "default" Loading @@ -77,6 +87,15 @@ const EVENT_POD_ADDED = 0 const EVENT_POD_MODIFIED = 1 const EVENT_POD_DELETED = 2 // Metrics var ( metricSboxCreateDuration = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "mon_engine_sbox_create_duration", Help: "A histogram of sandbox creation durations", Buckets: prometheus.LinearBuckets(10, 5, 5), }) ) var pod_event_str = [3]string{"pod added", "pod modified", "pod deleted"} var rc *redis.Connector var redisAddr = "meep-redis-master:6379" Loading @@ -85,6 +104,9 @@ var stopChan = make(chan struct{}) var mqGlobal *mq.MsgQueue var handlerId int var sandboxStore *sbs.SandboxStore var mutex sync.Mutex var sandboxes map[string]*Sandbox var depPodsList []string var corePodsList []string Loading Loading @@ -164,6 +186,9 @@ func Init() (err error) { } log.Info("Connected to Sandbox Store") // Initialize sandbox map sandboxes = make(map[string]*Sandbox) return nil } Loading @@ -174,6 +199,7 @@ func Run() (err error) { if sboxMap, err := sandboxStore.GetAll(); err == nil { for _, sbox := range sboxMap { addExpectedPods(sbox.Name) createSandbox(sbox.Name) } } Loading Loading @@ -205,9 +231,11 @@ func msgHandler(msg *mq.Msg, userData interface{}) { case mq.MsgSandboxCreate: log.Debug("RX MSG: ", mq.PrintMsg(msg)) addExpectedPods(msg.Payload[fieldSandboxName]) createSandbox(msg.Payload[fieldSandboxName]) case mq.MsgSandboxDestroy: log.Debug("RX MSG: ", mq.PrintMsg(msg)) removeExpectedPods(msg.Payload[fieldSandboxName]) deleteSandbox(msg.Payload[fieldSandboxName]) default: log.Trace("Ignoring unsupported message: ", mq.PrintMsg(msg)) } Loading Loading @@ -355,16 +383,17 @@ func processEvent(obj interface{}, reason int) { // Add, update or remove entry in DB only if core or scenario pod if monEngineInfo.PodType != notFoundStr { if reason == EVENT_POD_DELETED { deleteEntryInDB(monEngineInfo) deleteEntryInDB(&monEngineInfo) } else { addOrUpdateEntryInDB(monEngineInfo) addOrUpdateEntryInDB(&monEngineInfo) monitorSboxCreation(&monEngineInfo) } } else { log.Debug("Ignoring non-AdvantEDGE pod: ", monEngineInfo.PodName) } } func addOrUpdateEntryInDB(monEngineInfo MonEngineInfo) { func addOrUpdateEntryInDB(monEngineInfo *MonEngineInfo) { // Populate rule fields fields := make(map[string]interface{}) fields["type"] = monEngineInfo.PodType Loading Loading @@ -396,7 +425,7 @@ func addOrUpdateEntryInDB(monEngineInfo MonEngineInfo) { } } func deleteEntryInDB(monEngineInfo MonEngineInfo) { func deleteEntryInDB(monEngineInfo *MonEngineInfo) { // Make unique key key := baseKey + monEngineInfo.Namespace + ":" + monEngineInfo.PodType + ":" + monEngineInfo.PodName Loading Loading @@ -441,6 +470,43 @@ func k8sConnect() (err error) { return nil } func monitorSboxCreation(monEngineInfo *MonEngineInfo) { mutex.Lock() defer mutex.Unlock() // Find matching sandbox entry sboxName := monEngineInfo.Namespace if sbox, found := sandboxes[sboxName]; found { // Ignore if sbox already running if !sbox.Running { // Set release running state if _, exists := sbox.Releases[monEngineInfo.Release]; exists { sbox.Releases[monEngineInfo.Release] = (monEngineInfo.LogicalState == "Running") // Check if sandbox is running sboxRunning := true for _, running := range sbox.Releases { if !running { sboxRunning = false break } } // If all releases are running, log sandbox creation time metric if sboxRunning { sbox.Running = true creationTime := float64(time.Since(sbox.StartTime).Milliseconds()) / 1000.0 log.Info("Sbox: ", sboxName, " creationTime: ", creationTime) metricSboxCreateDuration.Observe(creationTime) } } } } } // Retrieve POD states // GET /states func meGetStates(w http.ResponseWriter, r *http.Request) { Loading Loading @@ -636,3 +702,32 @@ func removeExpectedPods(sandboxName string) { delete(expectedSboxPods, podName) } } // Create new sandbox to monitor func createSandbox(sandboxName string) { mutex.Lock() defer mutex.Unlock() if _, exists := sandboxes[sandboxName]; !exists { sbox := new(Sandbox) sbox.Running = false sbox.Releases = make(map[string]bool) sbox.StartTime = time.Now() for _, pod := range sboxPodsList { sbox.Releases[pod] = false } sandboxes[sandboxName] = sbox log.Info("Created new sandbox to monitor: ", sandboxName) } } // Delete monitored sandbox func deleteSandbox(sandboxName string) { mutex.Lock() defer mutex.Unlock() if _, exists := sandboxes[sandboxName]; exists { delete(sandboxes, sandboxName) log.Info("Removed sandbox to monitor: ", sandboxName) } } Loading
go-apps/meep-mon-engine/server/mon-engine.go +99 −4 Original line number Diff line number Diff line Loading @@ -22,6 +22,8 @@ import ( "net/http" "os" "strings" "sync" "time" dkm "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-data-key-mgr" log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger" Loading @@ -30,6 +32,8 @@ import ( sbs "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-sandbox-store" v1 "k8s.io/api/core/v1" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" Loading Loading @@ -63,6 +67,12 @@ type MonEngineInfo struct { StartTime string } type Sandbox struct { Releases map[string]bool StartTime time.Time Running bool } const serviceName = "Monitoring Engine" const moduleName = "meep-mon-engine" const moduleNamespace = "default" Loading @@ -77,6 +87,15 @@ const EVENT_POD_ADDED = 0 const EVENT_POD_MODIFIED = 1 const EVENT_POD_DELETED = 2 // Metrics var ( metricSboxCreateDuration = promauto.NewHistogram(prometheus.HistogramOpts{ Name: "mon_engine_sbox_create_duration", Help: "A histogram of sandbox creation durations", Buckets: prometheus.LinearBuckets(10, 5, 5), }) ) var pod_event_str = [3]string{"pod added", "pod modified", "pod deleted"} var rc *redis.Connector var redisAddr = "meep-redis-master:6379" Loading @@ -85,6 +104,9 @@ var stopChan = make(chan struct{}) var mqGlobal *mq.MsgQueue var handlerId int var sandboxStore *sbs.SandboxStore var mutex sync.Mutex var sandboxes map[string]*Sandbox var depPodsList []string var corePodsList []string Loading Loading @@ -164,6 +186,9 @@ func Init() (err error) { } log.Info("Connected to Sandbox Store") // Initialize sandbox map sandboxes = make(map[string]*Sandbox) return nil } Loading @@ -174,6 +199,7 @@ func Run() (err error) { if sboxMap, err := sandboxStore.GetAll(); err == nil { for _, sbox := range sboxMap { addExpectedPods(sbox.Name) createSandbox(sbox.Name) } } Loading Loading @@ -205,9 +231,11 @@ func msgHandler(msg *mq.Msg, userData interface{}) { case mq.MsgSandboxCreate: log.Debug("RX MSG: ", mq.PrintMsg(msg)) addExpectedPods(msg.Payload[fieldSandboxName]) createSandbox(msg.Payload[fieldSandboxName]) case mq.MsgSandboxDestroy: log.Debug("RX MSG: ", mq.PrintMsg(msg)) removeExpectedPods(msg.Payload[fieldSandboxName]) deleteSandbox(msg.Payload[fieldSandboxName]) default: log.Trace("Ignoring unsupported message: ", mq.PrintMsg(msg)) } Loading Loading @@ -355,16 +383,17 @@ func processEvent(obj interface{}, reason int) { // Add, update or remove entry in DB only if core or scenario pod if monEngineInfo.PodType != notFoundStr { if reason == EVENT_POD_DELETED { deleteEntryInDB(monEngineInfo) deleteEntryInDB(&monEngineInfo) } else { addOrUpdateEntryInDB(monEngineInfo) addOrUpdateEntryInDB(&monEngineInfo) monitorSboxCreation(&monEngineInfo) } } else { log.Debug("Ignoring non-AdvantEDGE pod: ", monEngineInfo.PodName) } } func addOrUpdateEntryInDB(monEngineInfo MonEngineInfo) { func addOrUpdateEntryInDB(monEngineInfo *MonEngineInfo) { // Populate rule fields fields := make(map[string]interface{}) fields["type"] = monEngineInfo.PodType Loading Loading @@ -396,7 +425,7 @@ func addOrUpdateEntryInDB(monEngineInfo MonEngineInfo) { } } func deleteEntryInDB(monEngineInfo MonEngineInfo) { func deleteEntryInDB(monEngineInfo *MonEngineInfo) { // Make unique key key := baseKey + monEngineInfo.Namespace + ":" + monEngineInfo.PodType + ":" + monEngineInfo.PodName Loading Loading @@ -441,6 +470,43 @@ func k8sConnect() (err error) { return nil } func monitorSboxCreation(monEngineInfo *MonEngineInfo) { mutex.Lock() defer mutex.Unlock() // Find matching sandbox entry sboxName := monEngineInfo.Namespace if sbox, found := sandboxes[sboxName]; found { // Ignore if sbox already running if !sbox.Running { // Set release running state if _, exists := sbox.Releases[monEngineInfo.Release]; exists { sbox.Releases[monEngineInfo.Release] = (monEngineInfo.LogicalState == "Running") // Check if sandbox is running sboxRunning := true for _, running := range sbox.Releases { if !running { sboxRunning = false break } } // If all releases are running, log sandbox creation time metric if sboxRunning { sbox.Running = true creationTime := float64(time.Since(sbox.StartTime).Milliseconds()) / 1000.0 log.Info("Sbox: ", sboxName, " creationTime: ", creationTime) metricSboxCreateDuration.Observe(creationTime) } } } } } // Retrieve POD states // GET /states func meGetStates(w http.ResponseWriter, r *http.Request) { Loading Loading @@ -636,3 +702,32 @@ func removeExpectedPods(sandboxName string) { delete(expectedSboxPods, podName) } } // Create new sandbox to monitor func createSandbox(sandboxName string) { mutex.Lock() defer mutex.Unlock() if _, exists := sandboxes[sandboxName]; !exists { sbox := new(Sandbox) sbox.Running = false sbox.Releases = make(map[string]bool) sbox.StartTime = time.Now() for _, pod := range sboxPodsList { sbox.Releases[pod] = false } sandboxes[sandboxName] = sbox log.Info("Created new sandbox to monitor: ", sandboxName) } } // Delete monitored sandbox func deleteSandbox(sandboxName string) { mutex.Lock() defer mutex.Unlock() if _, exists := sandboxes[sandboxName]; exists { delete(sandboxes, sandboxName) log.Info("Removed sandbox to monitor: ", sandboxName) } }