Commit fff6786b authored by Kevin Di Lallo's avatar Kevin Di Lallo
Browse files

integrated message queue in all microservices

parent 7cf030c4
Loading
Loading
Loading
Loading
+3 −0
Original line number Diff line number Diff line
@@ -208,6 +208,9 @@ repo:
      meep-model:
        src: go-packages/meep-model
        lint: true
      meep-mq:
        src: go-packages/meep-mq
        lint: true
      meep-net-char-mgr:
        src: go-packages/meep-net-char-mgr
        lint: true
+2 −0
Original line number Diff line number Diff line
@@ -7,6 +7,7 @@ require (
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-loc-serv-notification-client v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-model v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mq v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis v0.0.0
	github.com/gorilla/handlers v1.4.0
	github.com/gorilla/mux v1.7.3
@@ -21,5 +22,6 @@ replace (
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger => ../../go-packages/meep-logger
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-metric-store => ../../go-packages/meep-metric-store
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-model => ../../go-packages/meep-model
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mq => ../../go-packages/meep-mq
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis => ../../go-packages/meep-redis
)
+58 −20
Original line number Diff line number Diff line
@@ -17,18 +17,23 @@
package sbi

import (
	"errors"
	"os"
	"strings"

	httpLog "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-http-logger"
	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	mod "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-model"
	mq "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mq"
)

const module string = "loc-serv-sbi"
const moduleName string = "meep-loc-serv"
const moduleName string = "meep-loc-serv-sbi"
const redisAddr string = "meep-redis-master.default.svc.cluster.local:6379"

type LocServSbi struct {
	sandboxName             string
	msgQueue                *mq.MsgQueue
	handlerId               int
	activeModel             *mod.Model
	updateUserInfoCB        func(string, string, string)
	updateZoneInfoCB        func(string, int, int, int)
@@ -45,8 +50,26 @@ func Init(updateUserInfo func(string, string, string), updateZoneInfo func(strin
	// Create new SBI instance
	sbi = new(LocServSbi)

	// Create new Model
	sbi.activeModel, err = mod.NewModel(redisAddr, module, "activeScenario")
	// Retrieve Sandbox name from environment variable
	sbi.sandboxName = strings.TrimSpace(os.Getenv("MEEP_SANDBOX_NAME"))
	if sbi.sandboxName == "" {
		err = errors.New("MEEP_SANDBOX_NAME env variable not set")
		log.Error(err.Error())
		return err
	}
	log.Info("MEEP_SANDBOX_NAME: ", sbi.sandboxName)

	// Create message queue
	sbi.msgQueue, err = mq.NewMsgQueue(moduleName, sbi.sandboxName, redisAddr)
	if err != nil {
		log.Error("Failed to create Message Queue with error: ", err)
		return err
	}
	log.Info("Message Queue created")

	// Create new active scenario model
	modelCfg := mod.ModelCfg{Name: "activeScenario", Module: moduleName, UpdateCb: nil, DbAddr: redisAddr}
	sbi.activeModel, err = mod.NewModel(modelCfg)
	if err != nil {
		log.Error("Failed to create model: ", err.Error())
		return err
@@ -63,34 +86,49 @@ func Init(updateUserInfo func(string, string, string), updateZoneInfo func(strin
// Run - MEEP Location Service execution
func Run() (err error) {

	// Listen for Model updates
	err = sbi.activeModel.Listen(eventHandler)
	// Register Message Queue handler
	handler := mq.MsgHandler{Scope: mq.ScopeLocal, Handler: msgHandler, UserData: nil}
	sbi.handlerId, err = sbi.msgQueue.RegisterHandler(handler)
	if err != nil {
		log.Error("Failed to listen for model updates: ", err.Error())
		log.Error("Failed to listen for sandbox updates: ", err.Error())
		return err
	}

	return nil
}

func eventHandler(channel string, payload string) {
	// Handle Message according to Rx Channel
	switch channel {

	// MEEP Ctrl Engine active scenario update Channel
	case mod.ActiveScenarioEvents:
		log.Debug("Event received on channel: ", mod.ActiveScenarioEvents, " payload: ", payload)
		if payload == mod.EventTerminate {
			sbi.cleanUpCB()
		} else {
// Message Queue handler
func msgHandler(msg *mq.Msg, userData interface{}) {
	switch msg.Message {
	case mq.MsgScenarioActivate:
		log.Debug("RX MSG: ", mq.PrintMsg(msg))
		processActiveScenarioUpdate()
		}
	case mq.MsgScenarioUpdate:
		log.Debug("RX MSG: ", mq.PrintMsg(msg))
		processActiveScenarioUpdate()
	case mq.MsgScenarioTerminate:
		log.Debug("RX MSG: ", mq.PrintMsg(msg))
		processActiveScenarioTerminate()
	default:
		log.Warn("Unsupported channel", " payload: ", payload)
		log.Trace("Ignoring unsupported message: ", mq.PrintMsg(msg))
	}
}

func processActiveScenarioTerminate() {
	log.Debug("processActiveScenarioTerminate")

	// Sync with active scenario store
	sbi.activeModel.UpdateScenario()

	sbi.cleanUpCB()
}

func processActiveScenarioUpdate() {
	log.Debug("processActiveScenarioUpdate")

	// Sync with active scenario store
	sbi.activeModel.UpdateScenario()

	uePerNetLocMap := make(map[string]int)
	uePerZoneMap := make(map[string]int)
	poaPerZoneMap := make(map[string]int)
+2 −0
Original line number Diff line number Diff line
@@ -9,6 +9,7 @@ require (
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-metric-store v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-metrics-engine-notification-client v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-model v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mq v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis v0.0.0
	github.com/fortytw2/leaktest v1.3.0 // indirect
	github.com/google/go-cmp v0.3.1 // indirect
@@ -26,5 +27,6 @@ replace (
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-metric-store => ../../go-packages/meep-metric-store
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-metrics-engine-notification-client => ../../go-packages/meep-metrics-engine-notification-client
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-model => ../../go-packages/meep-model
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mq => ../../go-packages/meep-mq
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis => ../../go-packages/meep-redis
)
+1 −0
Original line number Diff line number Diff line
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/InterDigitalInc/AdvantEDGE v1.4.0 h1:MnzK3dl4hUvfhZ4wYarVmJzE5IMO2Q63dyDkcQtEv1Q=
github.com/KromDaniel/jonson v0.0.0-20180630143114-d2f9c3c389db/go.mod h1:RU+6d0CNIRSp6yo1mXLIIrnFa/3LHhvcDVLVJyovptM=
github.com/KromDaniel/rejonson v0.0.0-20180822072824-00b5bcf2b351 h1:1u1XrfCBnY+GijnyU6O1k4odp5TnqZQTsp5v7+n/E4Y=
github.com/KromDaniel/rejonson v0.0.0-20180822072824-00b5bcf2b351/go.mod h1:HxwfbuElTuGf+/uKZfjJrCnv0BmmpkPJDI7gBwj1KkM=
Loading