Commit 2ba4804e authored by Kevin Di Lallo's avatar Kevin Di Lallo
Browse files

subscriptions package fixes

parent a1e66d62
Loading
Loading
Loading
Loading
+21 −7
Original line number Diff line number Diff line
@@ -218,6 +218,11 @@ func Init() (err error) {
	subMgrCfg.Module = moduleName
	subMgrCfg.Sandbox = sandboxName
	subMgrCfg.Mep = mepName
	subMgrCfg.Service = serviceName
	subMgrCfg.Basekey = baseKey
	subMgrCfg.MetricsEnabled = true
	subMgrCfg.ExpiredSubCb = ExpiredSubscriptionCb
	subMgrCfg.PeriodicSubCb = PeriodicSubscriptionCb
	subMgr, err = sm.NewSubscriptionMgr(&subMgrCfg, redisAddr)
	if err != nil {
		log.Error("Failed to create Subscription Manager. Error: ", err)
@@ -642,8 +647,8 @@ func checkAllStaDataRateNotification(staId *StaIdentity, dataRateDl int32, dataR
}

func checkStaDataRateNotification(sub *sm.Subscription, staId *StaIdentity, dataRateDl int32, dataRateUl int32) {
	// Ignore periodic subscriptions
	if sub.Cfg.PeriodicInterval > 0 {
	// Make sure subscription is ready to send notifications
	if !subMgr.ReadyToSend(sub) {
		return
	}

@@ -763,7 +768,9 @@ func checkStaDataRateNotification(sub *sm.Subscription, staId *StaIdentity, data
	}

	log.Info("Sending STA Data Rate notification for sub: ", sub.Cfg.Id)
	go subMgr.SendNotification(sub, jsonNotif)
	go func() {
		_ = subMgr.SendNotification(sub, jsonNotif)
	}()
}

// func sendStaDataRateNotification(notifyUrl string, notification StaDataRateNotification) {
@@ -877,8 +884,8 @@ func checkAllAssocStaNotification(staMacIds []string, apMacId string) {
}

func checkAssocStaNotification(sub *sm.Subscription, staMacIds []string, apMacId string) {
	// Ignore periodic subscriptions
	if sub.Cfg.PeriodicInterval > 0 {
	// Make sure subscription is ready to send notifications
	if !subMgr.ReadyToSend(sub) {
		return
	}

@@ -932,7 +939,9 @@ func checkAssocStaNotification(sub *sm.Subscription, staMacIds []string, apMacId
	}

	log.Info("Sending Assoc STA notification for sub: ", sub.Cfg.Id)
	go subMgr.SendNotification(sub, jsonNotif)
	go func() {
		_ = subMgr.SendNotification(sub, jsonNotif)
	}()
}

// func sendAssocStaNotification(notifyUrl string, notification AssocStaNotification) {
@@ -1483,7 +1492,10 @@ func ExpiredSubscriptionCb(sub *sm.Subscription) {
	}

	// Send expiry notification
	subMgr.SendNotification(sub, jsonNotif)
	log.Info("Sending Expiry notification for sub: ", sub.Cfg.Id)
	go func() {
		_ = subMgr.SendNotification(sub, jsonNotif)
	}()
}

// func sendExpiryNotification(notifyUrl string, notification ExpiryNotification) {
@@ -1507,6 +1519,8 @@ func ExpiredSubscriptionCb(sub *sm.Subscription) {

func PeriodicSubscriptionCb(sub *sm.Subscription) {

	log.Debug("PeriodicSubscriptionCb")

	switch sub.Cfg.Type {
	case ASSOC_STA_SUBSCRIPTION:
		// Get AP Info list
+30 −18
Original line number Diff line number Diff line
@@ -24,7 +24,7 @@ import (
	"io"
	"net/http"
	"net/http/httptest"
	"strconv"
	"strings"
	"testing"
	"time"

@@ -5910,15 +5910,15 @@ func TestSuccessSubscriptionAssocSta(t *testing.T) {
	fmt.Println("Set a scenario")
	initialiseScenario(testScenario)
	//post
	expectedGetResp := testSubscriptionAssocStaPost(t)
	subId, expectedGetResp := testSubscriptionAssocStaPost(t)
	//get
	testSubscriptionGet(t, strconv.Itoa(nextSubscriptionIdAvailable-1), expectedGetResp)
	testSubscriptionGet(t, subId, expectedGetResp)
	//put
	expectedGetResp = testSubscriptionAssocStaPut(t, strconv.Itoa(nextSubscriptionIdAvailable-1), true)
	expectedGetResp = testSubscriptionAssocStaPut(t, subId, true)
	//get
	testSubscriptionGet(t, strconv.Itoa(nextSubscriptionIdAvailable-1), expectedGetResp)
	testSubscriptionGet(t, subId, expectedGetResp)
	//delete
	testSubscriptionDelete(t, strconv.Itoa(nextSubscriptionIdAvailable-1), true)
	testSubscriptionDelete(t, subId, true)
	terminateScenario()
}

@@ -5941,13 +5941,13 @@ func TestFailSubscriptionAssocSta(t *testing.T) {
	initialiseScenario(testScenario)

	//get
	testSubscriptionGet(t, strconv.Itoa(nextSubscriptionIdAvailable), "")
	testSubscriptionGet(t, "invalidSubId", "")

	//put
	_ = testSubscriptionAssocStaPut(t, strconv.Itoa(nextSubscriptionIdAvailable), false)
	_ = testSubscriptionAssocStaPut(t, "invalidSubId", false)

	//delete
	testSubscriptionDelete(t, strconv.Itoa(nextSubscriptionIdAvailable), false)
	testSubscriptionDelete(t, "invalidSubId", false)

	terminateScenario()
}
@@ -5971,15 +5971,15 @@ func TestSubscriptionsListGet(t *testing.T) {
	initialiseScenario(testScenario)

	//post
	_ = testSubscriptionAssocStaPost(t)
	_ = testSubscriptionAssocStaPost(t)
	subId1, _ := testSubscriptionAssocStaPost(t)
	subId2, _ := testSubscriptionAssocStaPost(t)

	//get list
	testSubscriptionListGet(t)

	//delete
	testSubscriptionDelete(t, strconv.Itoa(nextSubscriptionIdAvailable-2), true)
	testSubscriptionDelete(t, strconv.Itoa(nextSubscriptionIdAvailable-1), true)
	testSubscriptionDelete(t, subId1, true)
	testSubscriptionDelete(t, subId2, true)

	terminateScenario()
}
@@ -6025,7 +6025,7 @@ func testSubscriptionListGet(t *testing.T) {
	}
}

func testSubscriptionAssocStaPost(t *testing.T) string {
func testSubscriptionAssocStaPost(t *testing.T) (string, string) {

	/******************************
	 * request vars section
@@ -6067,7 +6067,11 @@ func testSubscriptionAssocStaPost(t *testing.T) string {
	/******************************
	 * expected response section
	 ******************************/
	expectedLinkType := LinkType{"/" + testSandboxName + "/wai/v2/subscriptions/" + strconv.Itoa(nextSubscriptionIdAvailable)}
	self := respBody.Links.Self.Href
	fmt.Println("self: " + self)
	var subId = self[strings.LastIndex(self, "/")+1:]
	fmt.Println("subId: " + subId)
	expectedLinkType := LinkType{respBody.Links.Self.Href}
	expectedResponse := AssocStaSubscription{&AssocStaSubscriptionLinks{&expectedLinkType}, &apId, callBackRef, nil /*&expectedExpiry*/, &trigger, 0, false, ASSOC_STA_SUBSCRIPTION, nil}
	expectedResponseStr, err := json.Marshal(expectedResponse)
	if err != nil {
@@ -6077,7 +6081,7 @@ func testSubscriptionAssocStaPost(t *testing.T) string {
	if rr != string(expectedResponseStr) {
		t.Fatalf("Failed to get expected response")
	}
	return string(expectedResponseStr)
	return subId, string(expectedResponseStr)
}

func testSubscriptionAssocStaPut(t *testing.T, subscriptionId string, expectSuccess bool) string {
@@ -6365,7 +6369,13 @@ func TestSubscriptionAssocStaNotification(t *testing.T) {
	 * request execution section
	 ******************************/

	_, err = sendRequest(http.MethodPost, "/subscriptions", bytes.NewBuffer(body), nil, nil, http.StatusCreated, SubscriptionsPOST)
	rr, err := sendRequest(http.MethodPost, "/subscriptions", bytes.NewBuffer(body), nil, nil, http.StatusCreated, SubscriptionsPOST)
	if err != nil {
		t.Fatalf("Failed to get expected response")
	}

	var respBody AssocStaSubscription
	err = json.Unmarshal([]byte(rr), &respBody)
	if err != nil {
		t.Fatalf("Failed to get expected response")
	}
@@ -6409,7 +6419,9 @@ func TestSubscriptionAssocStaNotification(t *testing.T) {
	}

	//cleanup allocated subscription
	testSubscriptionDelete(t, strconv.Itoa(nextSubscriptionIdAvailable-1), true)
	self := respBody.Links.Self.Href
	var subId = self[strings.LastIndex(self, "/")+1:]
	testSubscriptionDelete(t, subId, true)

	/******************************
	 * back to initial state section
+4 −1
Original line number Diff line number Diff line
@@ -4,9 +4,10 @@ go 1.12

require (
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-data-key-mgr v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-http-logger v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-metrics v0.0.0
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis v0.0.0
	github.com/google/uuid v1.3.0
	github.com/gorilla/websocket v1.4.2
	github.com/onsi/ginkgo v1.16.5 // indirect
	github.com/onsi/gomega v1.16.0 // indirect
@@ -14,6 +15,8 @@ require (

replace (
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-data-key-mgr => ../../go-packages/meep-data-key-mgr
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-http-logger => ../../go-packages/meep-http-logger
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger => ../../go-packages/meep-logger
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-metrics => ../../go-packages/meep-metrics
	github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis => ../../go-packages/meep-redis
)
+353 −5

File changed.

Preview size limit exceeded, changes collapsed.

+35 −25
Original line number Diff line number Diff line
@@ -35,11 +35,13 @@ type SubscriptionMgrCfg struct {
	Module         string
	Sandbox        string
	Mep            string
	BasePath      string
	expiredSubCb  ExpiredSubscriptionCb
	periodicSubCb PeriodicSubscriptionCb
	testNotifCb   TestNotificationCb
	notifRespCb   NotificationRespCb
	Service        string
	Basekey        string
	MetricsEnabled bool
	ExpiredSubCb   ExpiredSubscriptionCb
	PeriodicSubCb  PeriodicSubscriptionCb
	TestNotifCb    TestNotificationCb
	NotifRespCb    NotificationRespCb
}

type SubscriptionMgr struct {
@@ -71,8 +73,12 @@ func NewSubscriptionMgr(cfg *SubscriptionMgrCfg, addr string) (sm *SubscriptionM
	log.Info("Connected to Subscription Manager Redis DB")

	// Get base store key
	if cfg.Basekey != "" {
		sm.baseKey = cfg.Basekey
	} else {
		// data:sbox:<sandbox-name>:<module-name>:mep:<mep-name>:app:<app-id>:sub:<sub-type>:<sub-id>
	sm.baseKey = dkm.GetKeyRoot(cfg.Sandbox) + ":" + cfg.Module + ":mep:" + cfg.Mep + ":"
		sm.baseKey = dkm.GetKeyRoot(cfg.Sandbox) + cfg.Module + ":mep:" + cfg.Mep + ":"
	}

	// Initialize subscription cache from store
	var subList []*Subscription
@@ -239,17 +245,19 @@ func (sm *SubscriptionMgr) GetSubscriptionList(AppId string, Type string) ([]*Su
}

func (sm *SubscriptionMgr) GenerateSubscriptionId() string {
	randomStr, _ := generateRand(10)
	randomStr, _ := generateRand(12)
	// return uuid.New().String()
	return "sub" + randomStr
	return randomStr
}

func (sm *SubscriptionMgr) ForEachSubscription(AppId string, Type string, cb func(sub *Subscription) error, userData interface{}) error {

	// Iterate through subscriptions and invoke provided function
	// Useful when looking for matching subscriptions to trigger notifications
	// 	   Notifications to be sent either directly via REST calls or via Websocket messages
	return nil
func (sm *SubscriptionMgr) ReadyToSend(sub *Subscription) bool {
	if sub.State != StateReady {
		return false
	}
	if sub.Cfg.PeriodicInterval > 0 && sub.PeriodicCounter != periodicCounterPending {
		return false
	}
	return true
}

func (sm *SubscriptionMgr) SendNotification(sub *Subscription, notif []byte) error {
@@ -259,10 +267,9 @@ func (sm *SubscriptionMgr) SendNotification(sub *Subscription, notif []byte) err
	}

	// Send notification
	err := sub.sendNotification(notif)
	err := sub.sendNotification(sm.cfg, notif)
	if err != nil {
		log.Error(err.Error())
		return err
	}

	sm.mutex.Lock()
@@ -273,7 +280,7 @@ func (sm *SubscriptionMgr) SendNotification(sub *Subscription, notif []byte) err
		sub.PeriodicCounter = sub.Cfg.PeriodicInterval
	}

	return nil
	return err
}

func (sm *SubscriptionMgr) delSubscription(sub *Subscription) error {
@@ -302,7 +309,7 @@ func (sm *SubscriptionMgr) runTicker() {
	defer sm.mutex.Unlock()

	// Check for expired subscriptions
	if sm.cfg.expiredSubCb != nil {
	if sm.cfg.ExpiredSubCb != nil {
		var expiredSubList []*Subscription
		currentTime := time.Now()

@@ -313,7 +320,9 @@ func (sm *SubscriptionMgr) runTicker() {
			} else if sub.ExpiryTime != nil && currentTime.After(*sub.ExpiryTime) {
				// Set state to expired & invoke expiry callback
				sub.State = StateExpired
				go sm.cfg.expiredSubCb(sub)

				log.Debug("Invoking expiry callback for sub: ", sub.Cfg.Id)
				go sm.cfg.ExpiredSubCb(sub)
			}
		}

@@ -324,7 +333,7 @@ func (sm *SubscriptionMgr) runTicker() {
	}

	// Trigger periodic notifications
	if sm.cfg.periodicSubCb != nil {
	if sm.cfg.PeriodicSubCb != nil {
		for _, sub := range sm.subscriptions {
			if sub.Cfg.PeriodicInterval > 0 {
				if sub.PeriodicCounter > 0 {
@@ -335,7 +344,8 @@ func (sm *SubscriptionMgr) runTicker() {
					sub.PeriodicCounter = periodicCounterPending

					// Invoke periodic callback
					go sm.cfg.periodicSubCb(sub)
					log.Debug("Invoking periodic callback for sub: ", sub.Cfg.Id)
					go sm.cfg.PeriodicSubCb(sub)
				}
			}
		}
Loading