Unverified Commit f0059960 authored by Kevin Di Lallo's avatar Kevin Di Lallo Committed by GitHub
Browse files

Merge pull request #127 from pastorsx/sp_dev_serv_thread_safe

Thread safe operations in subscription based services
parents e7936dc4 b990d7d1
Loading
Loading
Loading
Loading
+31 −2
Original line number Diff line number Diff line
@@ -26,6 +26,7 @@ import (
	"os"
	"strconv"
	"strings"
	"sync"
	"time"

	sbi "github.com/InterDigitalInc/AdvantEDGE/go-apps/meep-loc-serv/sbi"
@@ -104,6 +105,7 @@ var sandboxName string
var basePath string
var baseKey string
var sessionMgr *sm.SessionMgr
var mutex sync.Mutex

// Init - Location Service initialization
func Init() (err error) {
@@ -203,6 +205,8 @@ func createClient(notifyPath string) (*clientNotifOMA.APIClient, error) {

func deregisterZoneStatus(subsIdStr string) {
	subsId, _ := strconv.Atoi(subsIdStr)
	mutex.Lock()
	defer mutex.Unlock()
	zoneStatusSubscriptionMap[subsId] = nil
}

@@ -227,12 +231,15 @@ func registerZoneStatus(zoneId string, nbOfUsersZoneThreshold int32, nbOfUsersAP
	zoneStatus.NbUsersInZoneThreshold = (int)(nbOfUsersZoneThreshold)
	zoneStatus.NbUsersInAPThreshold = (int)(nbOfUsersAPThreshold)
	zoneStatus.ZoneId = zoneId

	mutex.Lock()
	defer mutex.Unlock()
	zoneStatusSubscriptionMap[subsId] = &zoneStatus
}

func deregisterZonal(subsIdStr string) {
	subsId, _ := strconv.Atoi(subsIdStr)
	mutex.Lock()
	defer mutex.Unlock()
	zonalSubscriptionMap[subsId] = ""
	zonalSubscriptionEnteringMap[subsId] = ""
	zonalSubscriptionLeavingMap[subsId] = ""
@@ -243,6 +250,8 @@ func registerZonal(zoneId string, event []UserEventType, subsIdStr string) {

	subsId, _ := strconv.Atoi(subsIdStr)

	mutex.Lock()
	defer mutex.Unlock()
	if event != nil {
		for i := 0; i < len(event); i++ {
			switch event[i] {
@@ -265,6 +274,8 @@ func registerZonal(zoneId string, event []UserEventType, subsIdStr string) {

func deregisterUser(subsIdStr string) {
	subsId, _ := strconv.Atoi(subsIdStr)
	mutex.Lock()
	defer mutex.Unlock()
	userSubscriptionMap[subsId] = ""
	userSubscriptionEnteringMap[subsId] = ""
	userSubscriptionLeavingMap[subsId] = ""
@@ -274,7 +285,8 @@ func deregisterUser(subsIdStr string) {
func registerUser(userAddress string, event []UserEventType, subsIdStr string) {

	subsId, _ := strconv.Atoi(subsIdStr)

	mutex.Lock()
	defer mutex.Unlock()
	if event != nil {
		for i := 0; i < len(event); i++ {
			switch event[i] {
@@ -311,6 +323,9 @@ func checkNotificationRegistrations(checkType int, param1 string, param2 string,

func checkNotificationRegisteredZoneStatus(zoneId string, apId string, nbUsersInAPStr string, nbUsersInZoneStr string) {

	mutex.Lock()
	defer mutex.Unlock()

	//check all that applies
	for subsId, zoneStatus := range zoneStatusSubscriptionMap {
		if zoneStatus == nil {
@@ -367,6 +382,8 @@ func checkNotificationRegisteredZoneStatus(zoneId string, apId string, nbUsersIn

func checkNotificationRegisteredUsers(oldZoneId string, newZoneId string, oldApId string, newApId string, userId string) {

	mutex.Lock()
	defer mutex.Unlock()
	//check all that applies
	for subsId, value := range userSubscriptionMap {
		if value == userId {
@@ -472,6 +489,9 @@ func sendStatusNotification(notifyUrl string, ctx context.Context, subscriptionI

func checkNotificationRegisteredZones(oldZoneId string, newZoneId string, oldApId string, newApId string, userId string) {

	mutex.Lock()
	defer mutex.Unlock()

	//check all that applies
	for subsId, value := range zonalSubscriptionMap {

@@ -1304,6 +1324,8 @@ func cleanUp() {
	nextUserSubscriptionIdAvailable = 1
	nextZoneStatusSubscriptionIdAvailable = 1

	mutex.Lock()
	defer mutex.Unlock()
	zonalSubscriptionEnteringMap = map[int]string{}
	zonalSubscriptionLeavingMap = map[int]string{}
	zonalSubscriptionTransferringMap = map[int]string{}
@@ -1440,6 +1462,8 @@ func zoneStatusReInit() {
	_ = rc.ForEachJSONEntry(keyName, populateZoneStatusList, &zoneList)

	maxZoneStatusSubscriptionId := 0
	mutex.Lock()
	defer mutex.Unlock()
	for _, zone := range zoneList.ZoneStatusSubscription {
		resourceUrl := strings.Split(zone.ResourceURL, "/")
		subscriptionId, err := strconv.Atoi(resourceUrl[len(resourceUrl)-1])
@@ -1482,6 +1506,8 @@ func zonalTrafficReInit() {
	_ = rc.ForEachJSONEntry(keyName, populateZonalTrafficList, &zoneList)

	maxZonalSubscriptionId := 0
	mutex.Lock()
	defer mutex.Unlock()
	for _, zone := range zoneList.ZonalTrafficSubscription {
		resourceUrl := strings.Split(zone.ResourceURL, "/")
		subscriptionId, err := strconv.Atoi(resourceUrl[len(resourceUrl)-1])
@@ -1517,6 +1543,9 @@ func userTrackingReInit() {
	_ = rc.ForEachJSONEntry(keyName, populateUserTrackingList, &userList)

	maxUserSubscriptionId := 0
	mutex.Lock()
	defer mutex.Unlock()

	for _, user := range userList.UserTrackingSubscription {
		resourceUrl := strings.Split(user.ResourceURL, "/")
		subscriptionId, err := strconv.Atoi(resourceUrl[len(resourceUrl)-1])
+65 −11
Original line number Diff line number Diff line
@@ -26,6 +26,7 @@ import (
	"os"
	"strconv"
	"strings"
	"sync"
	"time"

	sbi "github.com/InterDigitalInc/AdvantEDGE/go-apps/meep-rnis/sbi"
@@ -68,6 +69,7 @@ var hostUrl *url.URL
var sandboxName string
var basePath string
var baseKey string
var mutex sync.Mutex

var expiryTicker *time.Ticker

@@ -307,6 +309,8 @@ func createClient(notifyPath string) (*clientNotif.APIClient, error) {
func checkForExpiredSubscriptions() {

	nowTime := int(time.Now().Unix())
	mutex.Lock()
	defer mutex.Unlock()
	for expiryTime, subsIndexList := range subscriptionExpiryMap {
		if expiryTime <= nowTime {
			subscriptionExpiryMap[expiryTime] = nil
@@ -337,7 +341,6 @@ func checkForExpiredSubscriptions() {
			}
		}
	}

}

func repopulateCcSubscriptionMap(key string, jsonInfo string, userData interface{}) error {
@@ -354,6 +357,9 @@ func repopulateCcSubscriptionMap(key string, jsonInfo string, userData interface
	subsIdStr := selfUrl[len(selfUrl)-1]
	subsId, _ := strconv.Atoi(subsIdStr)

	mutex.Lock()
	defer mutex.Unlock()

	ccSubscriptionMap[subsId] = &subscription
	if subscription.ExpiryDeadline != nil {
		intList := subscriptionExpiryMap[int(subscription.ExpiryDeadline.Seconds)]
@@ -383,6 +389,9 @@ func repopulateReSubscriptionMap(key string, jsonInfo string, userData interface
	subsIdStr := selfUrl[len(selfUrl)-1]
	subsId, _ := strconv.Atoi(subsIdStr)

	mutex.Lock()
	defer mutex.Unlock()

	reSubscriptionMap[subsId] = &subscription
	if subscription.ExpiryDeadline != nil {
		intList := subscriptionExpiryMap[int(subscription.ExpiryDeadline.Seconds)]
@@ -412,6 +421,9 @@ func repopulateRrSubscriptionMap(key string, jsonInfo string, userData interface
	subsIdStr := selfUrl[len(selfUrl)-1]
	subsId, _ := strconv.Atoi(subsIdStr)

	mutex.Lock()
	defer mutex.Unlock()

	rrSubscriptionMap[subsId] = &subscription
	if subscription.ExpiryDeadline != nil {
		intList := subscriptionExpiryMap[int(subscription.ExpiryDeadline.Seconds)]
@@ -434,6 +446,8 @@ func checkCcNotificationRegisteredSubscriptions(appId string, assocId *Associate
		return
	}

	mutex.Lock()
	defer mutex.Unlock()
	//check all that applies
	for subsId, sub := range ccSubscriptionMap {

@@ -550,6 +564,8 @@ func checkReNotificationRegisteredSubscriptions(appId string, assocId *Associate
		return
	}

	mutex.Lock()
	defer mutex.Unlock()
	//check all that applies
	for subsId, sub := range reSubscriptionMap {

@@ -647,6 +663,8 @@ func checkRrNotificationRegisteredSubscriptions(appId string, assocId *Associate
		return
	}

	mutex.Lock()
	defer mutex.Unlock()
	//check all that applies
	for subsId, sub := range rrSubscriptionMap {

@@ -866,34 +884,52 @@ func cellChangeSubscriptionsGET(w http.ResponseWriter, r *http.Request) {
}

func isSubscriptionIdRegisteredCc(subsIdStr string) bool {
	var returnVal bool
	subsId, _ := strconv.Atoi(subsIdStr)
	mutex.Lock()
	defer mutex.Unlock()

	if ccSubscriptionMap[subsId] != nil {
		return true
		returnVal = true
	} else {
		return false
		returnVal = false
	}
	return returnVal
}

func isSubscriptionIdRegisteredRe(subsIdStr string) bool {
	subsId, _ := strconv.Atoi(subsIdStr)
	var returnVal bool
	mutex.Lock()
	defer mutex.Unlock()

	if reSubscriptionMap[subsId] != nil {
		return true
		returnVal = true
	} else {
		return false
		returnVal = false
	}
	return returnVal
}

func isSubscriptionIdRegisteredRr(subsIdStr string) bool {
	subsId, _ := strconv.Atoi(subsIdStr)
	var returnVal bool
	mutex.Lock()
	defer mutex.Unlock()

	if rrSubscriptionMap[subsId] != nil {
		return true
		returnVal = true
	} else {
		return false
		returnVal = false
	}
	return returnVal
}

func registerCc(cellChangeSubscription *CellChangeSubscription, subsIdStr string) {
	subsId, _ := strconv.Atoi(subsIdStr)
	mutex.Lock()
	defer mutex.Unlock()

	ccSubscriptionMap[subsId] = cellChangeSubscription
	if cellChangeSubscription.ExpiryDeadline != nil {
		//get current list of subscription meant to expire at this time
@@ -901,12 +937,14 @@ func registerCc(cellChangeSubscription *CellChangeSubscription, subsIdStr string
		intList = append(intList, subsId)
		subscriptionExpiryMap[int(cellChangeSubscription.ExpiryDeadline.Seconds)] = intList
	}

	log.Info("New registration: ", subsId, " type: ", cellChangeSubscriptionType)
}

func registerRe(rabEstSubscription *RabEstSubscription, subsIdStr string) {
	subsId, _ := strconv.Atoi(subsIdStr)
	mutex.Lock()
	defer mutex.Unlock()

	reSubscriptionMap[subsId] = rabEstSubscription
	if rabEstSubscription.ExpiryDeadline != nil {
		//get current list of subscription meant to expire at this time
@@ -914,12 +952,14 @@ func registerRe(rabEstSubscription *RabEstSubscription, subsIdStr string) {
		intList = append(intList, subsId)
		subscriptionExpiryMap[int(rabEstSubscription.ExpiryDeadline.Seconds)] = intList
	}

	log.Info("New registration: ", subsId, " type: ", rabEstSubscriptionType)
}

func registerRr(rabRelSubscription *RabRelSubscription, subsIdStr string) {
	subsId, _ := strconv.Atoi(subsIdStr)
	mutex.Lock()
	defer mutex.Unlock()

	rrSubscriptionMap[subsId] = rabRelSubscription
	if rabRelSubscription.ExpiryDeadline != nil {
		//get current list of subscription meant to expire at this time
@@ -927,24 +967,32 @@ func registerRr(rabRelSubscription *RabRelSubscription, subsIdStr string) {
		intList = append(intList, subsId)
		subscriptionExpiryMap[int(rabRelSubscription.ExpiryDeadline.Seconds)] = intList
	}

	log.Info("New registration: ", subsId, " type: ", rabRelSubscriptionType)
}

func deregisterCc(subsIdStr string) {
	subsId, _ := strconv.Atoi(subsIdStr)
	mutex.Lock()
	defer mutex.Unlock()

	ccSubscriptionMap[subsId] = nil
	log.Info("Deregistration: ", subsId, " type: ", cellChangeSubscriptionType)
}

func deregisterRe(subsIdStr string) {
	subsId, _ := strconv.Atoi(subsIdStr)
	mutex.Lock()
	defer mutex.Unlock()

	reSubscriptionMap[subsId] = nil
	log.Info("Deregistration: ", subsId, " type: ", rabEstSubscriptionType)
}

func deregisterRr(subsIdStr string) {
	subsId, _ := strconv.Atoi(subsIdStr)
	mutex.Lock()
	defer mutex.Unlock()

	rrSubscriptionMap[subsId] = nil
	log.Info("Deregistration: ", subsId, " type: ", rabRelSubscriptionType)
}
@@ -1574,6 +1622,9 @@ func createSubscriptionLinkList(subType string) *SubscriptionLinkList {

	//loop through all different types of subscription

	mutex.Lock()
	defer mutex.Unlock()

	if subType == "" || subType == cellChangeSubscriptionType {
		//loop through cell_change map
		for _, ccSubscription := range ccSubscriptionMap {
@@ -1698,11 +1749,14 @@ func cleanUp() {
	nextSubscriptionIdAvailable = 1
	nextAvailableErabId = 1

	mutex.Lock()
	defer mutex.Unlock()

	ccSubscriptionMap = map[int]*CellChangeSubscription{}
	reSubscriptionMap = map[int]*RabEstSubscription{}
	rrSubscriptionMap = map[int]*RabRelSubscription{}

	subscriptionExpiryMap = map[int][]int{}

	updateStoreName("")
}