Unverified Commit 7a1a811d authored by Kevin Di Lallo's avatar Kevin Di Lallo Committed by GitHub
Browse files

Merge pull request #283 from dilallkx/kd_sp29_data_locality

Multi-MEP Support
parents 75252c12 a404a1cf
Loading
Loading
Loading
Loading
+2 −2
Original line number Diff line number Diff line
@@ -9,8 +9,8 @@ require (
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-http-logger v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-metrics v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-model v0.0.0 // indirect
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mq v0.0.0 // indirect
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-model v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mq v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis v0.0.0
	github.com/antihax/optional v1.0.0 // indirect
	github.com/google/uuid v1.2.0
+2 −0
Original line number Diff line number Diff line
@@ -2,6 +2,7 @@ cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMT
cloud.google.com/go v0.34.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/Knetic/govaluate v3.0.1-0.20171022003610-9aa49832a739+incompatible/go.mod h1:r7JcOSlj0wfOMncg0iLm8Leh48TZaKVeNIfJntJ2wa0=
github.com/RyanCarrier/dijkstra v0.0.0-20190726134004-b51cadb5ae52 h1:trnwuu/Q8T59kgRjXcSDBODnyZP9wes+bnLn0lx4PgM=
github.com/RyanCarrier/dijkstra v0.0.0-20190726134004-b51cadb5ae52/go.mod h1:DdR6ymcLl8+sN/XOVNjnYO1NDYfgHskGjreZUDuQCTY=
github.com/RyanCarrier/dijkstra-1 v0.0.0-20170512020943-0e5801a26345/go.mod h1:OK4EvWJ441LQqGzed5NGB6vKBAE34n3z7iayPcEwr30=
github.com/Shopify/sarama v1.19.0/go.mod h1:FVkBWblsNy7DGZRfXLU0O9RCGt5g3g3yEuWXgklEdEo=
@@ -29,6 +30,7 @@ github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+Ce
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/bgentry/speakeasy v0.1.0/go.mod h1:+zsyZBPWlz7T6j88CTgSN5bM796AkVf0kBD4zp0CCIs=
github.com/blang/semver v3.5.1+incompatible h1:cQNTCjp13qL8KC3Nbxr/y2Bqb63oX6wdnnjpJbkM4JQ=
github.com/blang/semver v3.5.1+incompatible/go.mod h1:kRBLl5iJ+tD4TcOOxsy/0fnwebNt5EWlYSAyrTnjyyk=
github.com/casbin/casbin/v2 v2.1.2/go.mod h1:YcPU1XXisHhLzuxH9coDNf2FbKpjGlbCg3n9yuLkIJQ=
github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM=
+110 −4
Original line number Diff line number Diff line
@@ -18,6 +18,7 @@ package server

import (
	"errors"
	"net/url"
	"os"
	"strings"
	"sync"
@@ -25,13 +26,27 @@ import (
	appInfo "github.com/InterDigitalInc/AdvantEDGE/go-apps/meep-app-enablement/server/app-info"
	appSupport "github.com/InterDigitalInc/AdvantEDGE/go-apps/meep-app-enablement/server/app-support"
	servMgmt "github.com/InterDigitalInc/AdvantEDGE/go-apps/meep-app-enablement/server/service-mgmt"
	httpLog "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-http-logger"
	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	mod "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-model"
	mq "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-mq"
)

const serviceName = "Edge Platform Application Enablement Service"
const moduleName = "meep-app-enablement"
const defaultMepName = "global"

var redisAddr string = "meep-redis-master.default.svc.cluster.local:6379"
var influxAddr string = "http://meep-influxdb.default.svc.cluster.local:8086"

var mutex sync.Mutex
var hostUrl *url.URL
var sandboxName string
var mepName string = defaultMepName
var handlerId int
var mqLocal *mq.MsgQueue
var activeModel *mod.Model
var currentStoreName = ""

// Init - EPAE Service initialization
func Init() (err error) {
@@ -48,25 +63,66 @@ func Init() (err error) {
	}
	log.Info("MEEP_SANDBOX_NAME: ", sandboxName)

	err = servMgmt.Init(&mutex)
	// Get MEP name
	mepNameEnv := strings.TrimSpace(os.Getenv("MEEP_MEP_NAME"))
	if mepNameEnv != "" {
		mepName = mepNameEnv
	}
	log.Info("MEEP_MEP_NAME: ", mepName)

	// hostUrl is the url of the node serving the resourceURL
	// Retrieve public url address where service is reachable, if not present, use Host URL environment variable
	hostUrl, err = url.Parse(strings.TrimSpace(os.Getenv("MEEP_PUBLIC_URL")))
	if err != nil || hostUrl == nil || hostUrl.String() == "" {
		hostUrl, err = url.Parse(strings.TrimSpace(os.Getenv("MEEP_HOST_URL")))
		if err != nil {
			hostUrl = new(url.URL)
		}
	}
	log.Info("MEEP_HOST_URL: ", hostUrl)

	// Create new active scenario model
	modelCfg := mod.ModelCfg{
		Name:      "activeScenario",
		Namespace: sandboxName,
		Module:    moduleName,
		UpdateCb:  nil,
		DbAddr:    redisAddr,
	}
	activeModel, err = mod.NewModel(modelCfg)
	if err != nil {
		log.Error("Failed to create model: ", err.Error())
		return err
	}
	log.Info("Active Scenario Model created")

	err = servMgmt.Init(sandboxName, mepName, hostUrl, &mutex)
	if err != nil {
		return err
	}

	err = appSupport.Init(sandboxName, mepName, hostUrl, &mutex)
	if err != nil {
		return err
	}

	err = appSupport.Init(&mutex)
	err = appInfo.Init(sandboxName, mepName, hostUrl, &mutex)
	if err != nil {
		return err
	}

	err = appInfo.Init(&mutex)
	// Create message queue
	mqLocal, err = mq.NewMsgQueue(mq.GetLocalName(sandboxName), moduleName, sandboxName, redisAddr)
	if err != nil {
		log.Error("Failed to create Message Queue with error: ", err)
		return err
	}
	log.Info("Message Queue created")

	return nil
}

// Run - Start
// Run - Start App Enablement
func Run() (err error) {

	err = servMgmt.Run()
@@ -84,5 +140,55 @@ func Run() (err error) {
		return err
	}

	// Register Message Queue handler
	handler := mq.MsgHandler{Handler: msgHandler, UserData: nil}
	handlerId, err = mqLocal.RegisterHandler(handler)
	if err != nil {
		log.Error("Failed to register local Msg Queue listener: ", err.Error())
		return err
	}
	log.Info("Registered local Msg Queue listener")

	// Initalize metric store
	updateStoreName()

	return nil
}

// Stop - Stop App Enablement
func Stop() {
	mqLocal.UnregisterHandler(handlerId)
}

// Message Queue handler
func msgHandler(msg *mq.Msg, userData interface{}) {
	switch msg.Message {
	case mq.MsgScenarioActivate:
		log.Debug("RX MSG: ", mq.PrintMsg(msg))
		updateStoreName()
	case mq.MsgScenarioTerminate:
		log.Debug("RX MSG: ", mq.PrintMsg(msg))
		updateStoreName()
	default:
	}
}

func updateStoreName() {
	mutex.Lock()
	defer mutex.Unlock()

	// Sync with active scenario store
	activeModel.UpdateScenario()

	// Update store names
	storeName := activeModel.GetScenarioName()
	if currentStoreName != storeName {
		currentStoreName = storeName

		logComponent := moduleName
		if mepName != defaultMepName {
			logComponent = moduleName + "-" + mepName
		}
		_ = httpLog.ReInit(logComponent, sandboxName, storeName, redisAddr, influxAddr)
	}
}
+7 −45
Original line number Diff line number Diff line
@@ -22,8 +22,6 @@ import (
	"fmt"
	"net/http"
	"net/url"
	"os"
	"strings"
	"sync"
	"time"

@@ -56,8 +54,6 @@ var baseKey string

var expiryTicker *time.Ticker

var nextAppInstanceIdAvailable int

type ApplicationInfoList struct {
	ApplicationInfos []ApplicationInfo
	filterParameters *FilterParameters
@@ -68,38 +64,12 @@ type FilterParameters struct {
	appState string
}

func Init(globalMutex *sync.Mutex) (err error) {
func Init(sandbox string, mep string, host *url.URL, globalMutex *sync.Mutex) (err error) {
	sandboxName = sandbox
	mepName = mep
	hostUrl = host
	mutex = globalMutex

	// Retrieve Sandbox name from environment variable
	sandboxNameEnv := strings.TrimSpace(os.Getenv("MEEP_SANDBOX_NAME"))
	if sandboxNameEnv != "" {
		sandboxName = sandboxNameEnv
	}
	if sandboxName == "" {
		err = errors.New("MEEP_SANDBOX_NAME env variable not set")
		log.Error(err.Error())
		return err
	}

	// hostUrl is the url of the node serving the resourceURL
	// Retrieve public url address where service is reachable, if not present, use Host URL environment variable
	hostUrl, err = url.Parse(strings.TrimSpace(os.Getenv("MEEP_PUBLIC_URL")))
	if err != nil || hostUrl == nil || hostUrl.String() == "" {
		hostUrl, err = url.Parse(strings.TrimSpace(os.Getenv("MEEP_HOST_URL")))
		if err != nil {
			hostUrl = new(url.URL)
		}
	}
	log.Info("MEEP_HOST_URL: ", hostUrl)

	// Get MEP name
	mepNameEnv := strings.TrimSpace(os.Getenv("MEEP_MEP_NAME"))
	if mepNameEnv != "" {
		mepName = mepNameEnv
	}
	log.Info("MEEP_MEP_NAME: ", mepName)

	// Set base path
	if mepName == defaultMepName {
		basePath = "/" + sandboxName + "/" + appInfoBasePath
@@ -119,8 +89,6 @@ func Init(globalMutex *sync.Mutex) (err error) {
	_ = rc.DBFlush(baseKey)
	log.Info("Connected to Redis DB")

	reInit()

	expiryTicker = time.NewTicker(time.Second)
	go func() {
		for range expiryTicker.C {
@@ -130,11 +98,6 @@ func Init(globalMutex *sync.Mutex) (err error) {
	return nil
}

// reInit - finds the value already in the DB to repopulate local stored info
func reInit() {
	nextAppInstanceIdAvailable = 1
}

// Run - Start
func Run() (err error) {
	return nil
@@ -146,10 +109,6 @@ func Stop() (err error) {
}

func getNewInstanceId() (string, error) {
	/*	appInstanceId := strconv.Itoa(nextAppInstanceIdAvailable)
		nextAppInstanceIdAvailable++
		return appInstanceId
	*/
	//allow 3 tries, if not return an error
	maxNbRetries := 3
	for try := maxNbRetries; try > 0; try-- {
@@ -353,6 +312,9 @@ func applicationsAppInstanceIdDELETE(w http.ResponseWriter, r *http.Request) {
	vars := mux.Vars(r)
	appInstanceId := vars["appInstanceId"]

	mutex.Lock()
	defer mutex.Unlock()

	// Check if App instance exists
	fields, err := rc.GetEntry(baseKey + ":app:" + appInstanceId + ":info")
	if err != nil || len(fields) == 0 {
+47 −92
Original line number Diff line number Diff line
@@ -23,7 +23,6 @@ import (
	"fmt"
	"net/http"
	"net/url"
	"os"
	"strconv"
	"strings"
	"sync"
@@ -69,39 +68,12 @@ var appTerminationGracefulTimeoutMap = map[string]*time.Ticker{}
var appTerminationNotificationSubscriptionMap = map[int]*AppTerminationNotificationSubscription{}
var nextSubscriptionIdAvailable int

func Init(globalMutex *sync.Mutex) (err error) {
func Init(sandbox string, mep string, host *url.URL, globalMutex *sync.Mutex) (err error) {
	sandboxName = sandbox
	mepName = mep
	hostUrl = host
	mutex = globalMutex

	// Retrieve Sandbox name from environment variable
	sandboxNameEnv := strings.TrimSpace(os.Getenv("MEEP_SANDBOX_NAME"))
	if sandboxNameEnv != "" {
		sandboxName = sandboxNameEnv
	}
	if sandboxName == "" {
		err = errors.New("MEEP_SANDBOX_NAME env variable not set")
		log.Error(err.Error())
		return err
	}
	log.Info("MEEP_SANDBOX_NAME: ", sandboxName)

	// hostUrl is the url of the node serving the resourceURL
	// Retrieve public url address where service is reachable, if not present, use Host URL environment variable
	hostUrl, err = url.Parse(strings.TrimSpace(os.Getenv("MEEP_PUBLIC_URL")))
	if err != nil || hostUrl == nil || hostUrl.String() == "" {
		hostUrl, err = url.Parse(strings.TrimSpace(os.Getenv("MEEP_HOST_URL")))
		if err != nil {
			hostUrl = new(url.URL)
		}
	}
	log.Info("MEEP_HOST_URL: ", hostUrl)

	// Get MEP name
	mepNameEnv := strings.TrimSpace(os.Getenv("MEEP_MEP_NAME"))
	if mepNameEnv != "" {
		mepName = mepNameEnv
	}
	log.Info("MEEP_MEP_NAME: ", mepName)

	// Set base path
	if mepName == defaultMepName {
		basePath = "/" + sandboxName + "/" + mappsupportBasePath
@@ -121,19 +93,14 @@ func Init(globalMutex *sync.Mutex) (err error) {
	_ = rc.DBFlush(baseKey)
	log.Info("Connected to Redis DB")

	reInit()

	return nil
}

// reInit - finds the value already in the DB to repopulate local stored info
// NOTE: Init is flushing everything so this is a non-operation code, but if a sbi is added that tracks Activation/Termination of scenarios, then this should become handy, leaving it there for future code updates if needed
func reInit() {
	//next available subsId will be overrriden if subscriptions already existed
	// Initialize subscription ID count
	nextSubscriptionIdAvailable = 1

	keyName := baseKey + ":app:*:" + mappsupportKey + ":sub:*"
	_ = rc.ForEachJSONEntry(keyName, repopulateAppTerminationNotificationSubscriptionMap, nil)
	// Initialize local termination notification subscription map from DB
	key := baseKey + ":app:*:" + mappsupportKey + ":sub:*"
	_ = rc.ForEachJSONEntry(key, repopulateAppTerminationNotificationSubscriptionMap, nil)

	return nil
}

// Run - Start APP support
@@ -344,7 +311,7 @@ func updateAllServices(appInstanceId string, state msmgmt.ServiceState) error {
	if err != nil {
		return err
	}
	for _, sInfo := range sInfoList.ServiceInfos {
	for _, sInfo := range sInfoList.Services {
		serviceId := sInfo.SerInstanceId
		sInfo.State = &state
		err = rc.JSONSetEntry(baseKey+":app:"+appInstanceId+":svc:"+serviceId, ".", msmgmt.ConvertServiceInfoToJson(&sInfo))
@@ -369,7 +336,7 @@ func populateServiceInfoList(key string, jsonInfo string, sInfoList interface{})
	if err != nil {
		return err
	}
	data.ServiceInfos = append(data.ServiceInfos, sInfo)
	data.Services = append(data.Services, sInfo)
	return nil
}

@@ -577,24 +544,14 @@ func SendAppTerminationNotification(appInstanceId string, gracefulTimeout int32)
	if gracefulTimeout == 0 {
		gracefulTimeout = DEFAULT_GRACEFUL_TIMEOUT
	}
	checkAppTermNotification(appInstanceId, gracefulTimeout, true)
}

func checkAppTermNotification(appInstanceId string, gracefulTimeout int32, needMutex bool) {
	if needMutex {
		mutex.Lock()
		defer mutex.Unlock()
	}
	//check all that applies
	// Filter subscriptions
	for subsId, sub := range appTerminationNotificationSubscriptionMap {
		if sub != nil {
			//find matching criteria
			match := false
			if sub.AppInstanceId == appInstanceId {
				match = true
		// Filter subscriptions
		if sub == nil || sub.AppInstanceId != appInstanceId {
			continue
		}

			if match {
		subsIdStr := strconv.Itoa(subsId)

		var notif AppTerminationNotification
@@ -628,8 +585,6 @@ func checkAppTermNotification(appInstanceId string, gracefulTimeout int32, needM
		}()
	}
}
	}
}

func sendAppTermNotification(notifyUrl string, notification AppTerminationNotification) {
	startTime := time.Now()
Loading