Newer
Older
/*
 * Copyright (c) 2020  InterDigital Communications, Inc
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package server
import (
	"net/http"
	"net/url"
	"os"
	"reflect"
	"strconv"
	"strings"
	"time"
	sbi "github.com/InterDigitalInc/AdvantEDGE/go-apps/meep-wais/sbi"
	dkm "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-data-key-mgr"
	httpLog "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-http-logger"
	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
Kevin Di Lallo
committed
	met "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-metrics"
	redis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis"
	"github.com/gorilla/mux"
)
const waisBasePath = "/wai/v2/"
Kevin Di Lallo
committed
const waisKey = "wais:"
const logModuleWAIS = "meep-wais"
const serviceName = "WAI Service"
const (
Simon Pastor
committed
	notifAssocSta    = "AssocStaNotification"
	notifStaDataRate = "StaDataRateNotification"
	notifExpiry      = "ExpiryNotification"
	notifTest        = "TestNotification"
Kevin Di Lallo
committed
)
Simon Pastor
committed
var metricStore *met.MetricStore
var redisAddr string = "meep-redis-master.default.svc.cluster.local:6379"
var influxAddr string = "http://meep-influxdb.default.svc.cluster.local:8086"
const assocStaSubscriptionType = "AssocStaSubscription"
Simon Pastor
committed
const staDataRateSubscriptionType = "StaDataRateSubscription"
Simon Pastor
committed
const STA_DATA_RATE_SUBSCRIPTION = "StaDataRateSubscription"
Simon Pastor
committed
const STA_DATA_RATE_NOTIFICATION = "StaDataRateNotification"
const TEST_NOTIFICATION = "TestNotification"
var assocStaSubscriptionMap = map[int]*AssocStaSubscription{}
Simon Pastor
committed
var staDataRateSubscriptionMap = map[int]*StaDataRateSubscription{}
var subscriptionExpiryMap = map[int][]int{}
var currentStoreName = ""
var WAIS_DB = 5
var rc *redis.Connector
var hostUrl *url.URL
var sandboxName string
var basePath string
var baseKey string
var expiryTicker *time.Ticker
var nextSubscriptionIdAvailable int
type ApInfoComplete struct {
	ApId       ApIdentity
	ApLocation ApLocation
	StaMacIds  []string
}
Simon Pastor
committed
type StaData struct {
	StaInfo  *StaInfo `json:"staInfo"`
	AppNames []string `json:"appNames"`
}
type StaInfoResp struct {
	StaInfoList []StaInfo
}
type ApInfoResp struct {
	ApInfoList []ApInfo
}
Simon Pastor
committed
func notImplemented(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	w.WriteHeader(http.StatusNotImplemented)
}
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
func Init() (err error) {
	// Retrieve Sandbox name from environment variable
	sandboxNameEnv := strings.TrimSpace(os.Getenv("MEEP_SANDBOX_NAME"))
	if sandboxNameEnv != "" {
		sandboxName = sandboxNameEnv
	}
	if sandboxName == "" {
		err = errors.New("MEEP_SANDBOX_NAME env variable not set")
		log.Error(err.Error())
		return err
	}
	log.Info("MEEP_SANDBOX_NAME: ", sandboxName)
	// hostUrl is the url of the node serving the resourceURL
	// Retrieve public url address where service is reachable, if not present, use Host URL environment variable
	hostUrl, err = url.Parse(strings.TrimSpace(os.Getenv("MEEP_PUBLIC_URL")))
	if err != nil || hostUrl == nil || hostUrl.String() == "" {
		hostUrl, err = url.Parse(strings.TrimSpace(os.Getenv("MEEP_HOST_URL")))
		if err != nil {
			hostUrl = new(url.URL)
		}
	}
	log.Info("resource URL: ", hostUrl)
	// Set base path
	basePath = "/" + sandboxName + waisBasePath
	// Get base store key
	baseKey = dkm.GetKeyRoot(sandboxName) + waisKey
	// Connect to Redis DB
	rc, err = redis.NewConnector(redisAddr, WAIS_DB)
	if err != nil {
		log.Error("Failed connection to Redis DB. Error: ", err)
		return err
	}
	_ = rc.DBFlush(baseKey)
	log.Info("Connected to Redis DB, RNI service table")
	reInit()
	expiryTicker = time.NewTicker(time.Second)
	go func() {
		for range expiryTicker.C {
			checkForExpiredSubscriptions()
		}
	}()
	// Initialize SBI
	sbiCfg := sbi.SbiCfg{
		SandboxName:    sandboxName,
		RedisAddr:      redisAddr,
		StaInfoCb:      updateStaInfo,
		ApInfoCb:       updateApInfo,
		ScenarioNameCb: updateStoreName,
		CleanUpCb:      cleanUp,
	}
	err = sbi.Init(sbiCfg)
	if err != nil {
		log.Error("Failed initialize SBI. Error: ", err)
		return err
	}
	log.Info("SBI Initialized")
	return nil
}
// reInit - finds the value already in the DB to repopulate local stored info
func reInit() {
	//next available subsId will be overrriden if subscriptions already existed
	nextSubscriptionIdAvailable = 1
	keyName := baseKey + "subscription:" + "*"
	_ = rc.ForEachJSONEntry(keyName, repopulateAssocStaSubscriptionMap, nil)
Simon Pastor
committed
	_ = rc.ForEachJSONEntry(keyName, repopulateStaDataRateSubscriptionMap, nil)
}
// Run - Start WAIS
func Run() (err error) {
	return sbi.Run()
}
// Stop - Stop WAIS
func Stop() (err error) {
	return sbi.Stop()
}
Simon Pastor
committed
func updateStaInfo(name string, ownMacId string, apMacId string, rssi *int32, appNames []string) {
	// Get STA Info from DB, if any
Simon Pastor
committed
	var staData *StaData
	jsonStaData, _ := rc.JSONGetEntry(baseKey+"UE:"+name, ".")
	if jsonStaData != "" {
		staData = convertJsonToStaData(jsonStaData)
	}
	//update data rates
	sumUl := 0.0
	sumDl := 0.0
	for _, appName := range appNames {
		metricsArray, err := metricStore.GetCachedNetworkMetrics("*", appName)
		if err != nil {
			log.Error("Failed to get network metric:", err)
		}
		//downlink for the app is uplink for the UE, and vice-versa
		for _, metrics := range metricsArray {
			sumUl += metrics.DlTput
			sumDl += metrics.UlTput
		}
Simon Pastor
committed
	var dataRate StaDataRate
	dataRate.StaLastDataDownlinkRate = int32(sumDl * 1000) //kbps
	dataRate.StaLastDataUplinkRate = int32(sumUl * 1000)   //kbps
	// Update DB if STA Info does not exist or has changed
Simon Pastor
committed
	if staData == nil || isStaInfoUpdateRequired(staData.StaInfo, ownMacId, apMacId, rssi, &dataRate) {
		// Set STA Mac ID
Simon Pastor
committed
		if staData == nil {
			staData = new(StaData)
			staData.StaInfo = new(StaInfo)
			staData.StaInfo.StaId = new(StaIdentity)
Simon Pastor
committed
		staData.StaInfo.StaId.MacId = ownMacId
		// Set Associated AP, if any
		if apMacId == "" {
Simon Pastor
committed
			staData.StaInfo.ApAssociated = nil
Simon Pastor
committed
			if staData.StaInfo.ApAssociated == nil {
				staData.StaInfo.ApAssociated = new(ApAssociated)
Simon Pastor
committed
			staData.StaInfo.ApAssociated.Bssid = apMacId
		}
		// Set RSSI
		if rssi != nil {
			var rssiObj Rssi
			rssiObj.Rssi = *rssi
Simon Pastor
committed
			staData.StaInfo.Rssi = &rssiObj
Simon Pastor
committed
			staData.StaInfo.Rssi = nil
Simon Pastor
committed
		dataRate.StaId = staData.StaInfo.StaId
		staData.StaInfo.StaDataRate = &dataRate
		staData.AppNames = appNames
		_ = rc.JSONSetEntry(baseKey+"UE:"+name, ".", convertStaDataToJson(staData))
Simon Pastor
committed
	checkStaDataRateNotificationRegisteredSubscriptions(staData.StaInfo.StaId, dataRate.StaLastDataDownlinkRate, dataRate.StaLastDataUplinkRate, false)
Simon Pastor
committed
func isStaInfoUpdateRequired(staInfo *StaInfo, ownMacId string, apMacId string, rssi *int32, dataRate *StaDataRate) bool {
	// Check if STA Info exists
	if staInfo == nil {
		return true
	}
	// Compare STA Mac
	if ownMacId != staInfo.StaId.MacId {
		return true
	}
	// Compare AP Mac
	if (apMacId == "" && staInfo.ApAssociated != nil) ||
Simon Pastor
committed
		(apMacId != "" && (staInfo.ApAssociated == nil || apMacId != staInfo.ApAssociated.Bssid)) {
	if (rssi == nil && staInfo.Rssi != nil) ||
		(rssi != nil && staInfo.Rssi == nil) ||
		(rssi != nil && staInfo.Rssi != nil && *rssi != staInfo.Rssi.Rssi) {
		return true
	}
Simon Pastor
committed
	if dataRate.StaLastDataDownlinkRate != staInfo.StaDataRate.StaLastDataDownlinkRate || dataRate.StaLastDataUplinkRate != staInfo.StaDataRate.StaLastDataUplinkRate {
		return true
	}
	str := fmt.Sprintf("%f", *value)
	strArray := strings.Split(str, ".")
	integerPart, err := strconv.Atoi(strArray[0])
	if err != nil {
		log.Error("Can't convert float to int")
		return 0
	}
	fractionPart, err := strconv.Atoi(strArray[1])
	if err != nil {
		log.Error("Can't convert float to int")
		return 0
	}
	//9 first bits are the integer part, last 23 bits are fraction part
	valueToReturn := (integerPart << 23) + fractionPart
	return int32(valueToReturn)
}
func isUpdateApInfoNeeded(jsonApInfoComplete string, newLong int32, newLat int32, staMacIds []string) bool {
	var oldStaMacIds []string
	var oldLat int32 = 0
	var oldLong int32 = 0
	if jsonApInfoComplete == "" {
		return true
	} else {
		apInfoComplete := convertJsonToApInfoComplete(jsonApInfoComplete)
		oldStaMacIds = apInfoComplete.StaMacIds
		if apInfoComplete.ApLocation.Geolocation != nil {
			oldLat = int32(apInfoComplete.ApLocation.Geolocation.Lat)
			oldLong = int32(apInfoComplete.ApLocation.Geolocation.Long)
	//if AP moved
	if oldLat != newLat || oldLong != newLong {
		return true
	}
	//if number of STAs connected changes
	if len(oldStaMacIds) != len(staMacIds) {
		return true
	}
	//if the list of connected STAs is different
	return !reflect.DeepEqual(oldStaMacIds, staMacIds)
}
func updateApInfo(name string, apMacId string, longitude *float32, latitude *float32, staMacIds []string) {
	//get from DB
	jsonApInfoComplete, _ := rc.JSONGetEntry(baseKey+"AP:"+name, ".")
	newLat := convertFloatToGeolocationFormat(latitude)
	newLong := convertFloatToGeolocationFormat(longitude)
	if isUpdateApInfoNeeded(jsonApInfoComplete, newLong, newLat, staMacIds) {
		//updateDB
		var apInfoComplete ApInfoComplete
		var apLocation ApLocation
		var geoLocation GeoLocation
		var apId ApIdentity
Simon Pastor
committed
		geoLocation.Lat = int32(newLat)
		geoLocation.Long = int32(newLong)
Simon Pastor
committed
		apId.Bssid = apMacId
		apInfoComplete.ApId = apId
		_ = rc.JSONSetEntry(baseKey+"AP:"+name, ".", convertApInfoCompleteToJson(&apInfoComplete))
Simon Pastor
committed
		checkAssocStaNotificationRegisteredSubscriptions(staMacIds, apMacId, false)
	}
}
func checkForExpiredSubscriptions() {
	nowTime := int(time.Now().Unix())
	for expiryTime, subsIndexList := range subscriptionExpiryMap {
		if expiryTime <= nowTime {
			subscriptionExpiryMap[expiryTime] = nil
			for _, subsId := range subsIndexList {
				if assocStaSubscriptionMap[subsId] != nil {
					subsIdStr := strconv.Itoa(subsId)
Simon Pastor
committed
					var expiryTimeStamp TimeStamp
					expiryTimeStamp.Seconds = int32(expiryTime)
					link := new(ExpiryNotificationLinks)
					linkType := new(LinkType)
					linkType.Href = assocStaSubscriptionMap[subsId].CallbackReference
					link.Subscription = linkType
					notif.Links = link
					notif.ExpiryDeadline = &expiryTimeStamp
					sendExpiryNotification(link.Subscription.Href, notif)
					_ = delSubscription(baseKey+"subscriptions", subsIdStr, true)
				}
				if staDataRateSubscriptionMap[subsId] != nil {
					subsIdStr := strconv.Itoa(subsId)
					var notif ExpiryNotification
					expiryTimeStamp.Seconds = int32(expiryTime)
Simon Pastor
committed
					linkType := new(LinkType)
					linkType.Href = staDataRateSubscriptionMap[subsId].CallbackReference
					link.Subscription = linkType
					notif.Links = link
					notif.ExpiryDeadline = &expiryTimeStamp
Simon Pastor
committed
					sendExpiryNotification(link.Subscription.Href, notif)
					_ = delSubscription(baseKey+"subscriptions", subsIdStr, true)
Simon Pastor
committed
func repopulateAssocStaSubscriptionMap(key string, jsonInfo string, userData interface{}) error {
	// Format response
	err := json.Unmarshal([]byte(jsonInfo), &subscription)
	if err != nil {
		return err
	}
	selfUrl := strings.Split(subscription.Links.Self.Href, "/")
	subsIdStr := selfUrl[len(selfUrl)-1]
	subsId, _ := strconv.Atoi(subsIdStr)
	assocStaSubscriptionMap[subsId] = &subscription
	if subscription.ExpiryDeadline != nil {
		intList := subscriptionExpiryMap[int(subscription.ExpiryDeadline.Seconds)]
		intList = append(intList, subsId)
		subscriptionExpiryMap[int(subscription.ExpiryDeadline.Seconds)] = intList
	}
	//reinitialisation of next available Id for future subscription request
	if subsId >= nextSubscriptionIdAvailable {
		nextSubscriptionIdAvailable = subsId + 1
	}
	return nil
}
Simon Pastor
committed
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
func repopulateStaDataRateSubscriptionMap(key string, jsonInfo string, userData interface{}) error {
	var subscription StaDataRateSubscription
	// Format response
	err := json.Unmarshal([]byte(jsonInfo), &subscription)
	if err != nil {
		return err
	}
	selfUrl := strings.Split(subscription.Links.Self.Href, "/")
	subsIdStr := selfUrl[len(selfUrl)-1]
	subsId, _ := strconv.Atoi(subsIdStr)
	mutex.Lock()
	defer mutex.Unlock()
	staDataRateSubscriptionMap[subsId] = &subscription
	if subscription.ExpiryDeadline != nil {
		intList := subscriptionExpiryMap[int(subscription.ExpiryDeadline.Seconds)]
		intList = append(intList, subsId)
		subscriptionExpiryMap[int(subscription.ExpiryDeadline.Seconds)] = intList
	}
	//reinitialisation of next available Id for future subscription request
	if subsId >= nextSubscriptionIdAvailable {
		nextSubscriptionIdAvailable = subsId + 1
	}
	return nil
}
func checkAssocStaNotificationRegisteredSubscriptions(staMacIds []string, apMacId string, isPeriodicInvoked bool) {
	//check all that applies
	for subsId, sub := range assocStaSubscriptionMap {
		match := false
		if sub != nil {
Simon Pastor
committed
			//if notification is periodic, only trigger on period expiry
			if sub.NotificationPeriod != 0 && !isPeriodicInvoked {
				continue
			}
			if sub.ApId.Bssid == apMacId {
Simon Pastor
committed
			if match {
				if sub.NotificationEvent != nil {
					match = false
					switch sub.NotificationEvent.Trigger {
					case "1":
						if len(staMacIds) >= int(sub.NotificationEvent.Threshold) {
							match = true
						}
					case "2":
						if len(staMacIds) <= int(sub.NotificationEvent.Threshold) {
							match = true
						}
					default:
					}
				}
			}
			if match {
				subsIdStr := strconv.Itoa(subsId)
				log.Info("Sending WAIS notification ", sub.CallbackReference)
Simon Pastor
committed
				apId.Bssid = apMacId
				notif.ApId = &apId
				for _, staMacId := range staMacIds {
					staId.MacId = staMacId
					notif.StaId = append(notif.StaId, staId)
				}
				log.Info("Assoc Sta Notification" + "(" + subsIdStr + ")")
			}
		}
	}
}
Simon Pastor
committed
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
func checkStaDataRateNotificationRegisteredSubscriptions(staId *StaIdentity, dataRateDl int32, dataRateUl int32, isPeriodicInvoked bool) {
	mutex.Lock()
	defer mutex.Unlock()
	//check all that applies
	for subsId, sub := range staDataRateSubscriptionMap {
		match := false
		if sub != nil {
			//if notification is periodic, only trigger on period expiry
			if sub.NotificationPeriod != 0 && !isPeriodicInvoked {
				continue
			}
			notifToSend := false
			var staDataRateList []StaDataRate
			for _, subStaId := range sub.StaId {
				//check to match every values and at least one when its an array
				if staId.MacId != subStaId.MacId {
					continue
				}
				if staId.Aid != subStaId.Aid {
					continue
				}
				match = true
				for _, ssid := range subStaId.Ssid {
					match = false
					//can only have one ssid at a time
					if ssid == staId.Ssid[0] {
						match = true
						break
					}
				}
				if match {
					for _, ipAddress := range subStaId.IpAddress {
						match = false
						//can only have one ip address
						if ipAddress == staId.IpAddress[0] {
							match = true
							break
						}
					}
				}
				if match {
					if sub.NotificationEvent != nil {
						match = false
						switch sub.NotificationEvent.Trigger {
						case "1":
							if dataRateDl >= sub.NotificationEvent.DownlinkRateThreshold {
								match = true
							}
						case "2":
							if dataRateDl <= sub.NotificationEvent.DownlinkRateThreshold {
								match = true
							}
						case "3":
							if dataRateUl >= sub.NotificationEvent.UplinkRateThreshold {
								match = true
							}
						case "4":
							if dataRateUl >= sub.NotificationEvent.UplinkRateThreshold {
								match = true
							}
						case "5":
							if dataRateDl >= sub.NotificationEvent.DownlinkRateThreshold && dataRateUl >= sub.NotificationEvent.UplinkRateThreshold {
								match = true
							}
						case "6":
							if dataRateDl <= sub.NotificationEvent.DownlinkRateThreshold && dataRateUl <= sub.NotificationEvent.UplinkRateThreshold {
								match = true
							}
						case "7":
							if dataRateDl >= sub.NotificationEvent.DownlinkRateThreshold || dataRateUl >= sub.NotificationEvent.UplinkRateThreshold {
								match = true
							}
						case "8":
							if dataRateDl <= sub.NotificationEvent.DownlinkRateThreshold || dataRateUl <= sub.NotificationEvent.UplinkRateThreshold {
								match = true
							}
						default:
						}
					}
				}
				if match {
					var staDataRate StaDataRate
					staDataRate.StaId = staId
					staDataRate.StaLastDataDownlinkRate = dataRateDl
					staDataRate.StaLastDataUplinkRate = dataRateUl
					staDataRateList = append(staDataRateList, staDataRate)
					notifToSend = true
				}
			}
			if notifToSend {
				subsIdStr := strconv.Itoa(subsId)
				log.Info("Sending WAIS notification ", sub.CallbackReference)
				var notif StaDataRateNotification
				seconds := time.Now().Unix()
				var timeStamp TimeStamp
				timeStamp.Seconds = int32(seconds)
				notif.TimeStamp = &timeStamp
				notif.NotificationType = STA_DATA_RATE_NOTIFICATION
				notif.StaDataRate = staDataRateList
				sendStaDataRateNotification(sub.CallbackReference, notif)
				log.Info("Sta Data Rate Notification" + "(" + subsIdStr + ")")
			}
		}
	}
}
func sendTestNotification(notifyUrl string, linkType *LinkType) {
	var notification TestNotification
	notification.NotificationType = TEST_NOTIFICATION
	link := new(ExpiryNotificationLinks)
	link.Subscription = linkType
	notification.Links = link
	startTime := time.Now()
	jsonNotif, err := json.Marshal(notification)
	if err != nil {
		log.Error(err.Error())
	}
	resp, err := http.Post(notifyUrl, "application/json", bytes.NewBuffer(jsonNotif))
	duration := float64(time.Since(startTime).Microseconds()) / 1000.0
	_ = httpLog.LogTx(notifyUrl, "POST", string(jsonNotif), resp, startTime)
	if err != nil {
		log.Error(err)
		met.ObserveNotification(sandboxName, serviceName, notifTest, notifyUrl, nil, duration)
		return
	}
	met.ObserveNotification(sandboxName, serviceName, notifTest, notifyUrl, resp, duration)
	defer resp.Body.Close()
}
func sendAssocStaNotification(notifyUrl string, notification AssocStaNotification) {
	jsonNotif, err := json.Marshal(notification)
	if err != nil {
		log.Error(err.Error())
	}
	resp, err := http.Post(notifyUrl, "application/json", bytes.NewBuffer(jsonNotif))
Kevin Di Lallo
committed
	duration := float64(time.Since(startTime).Microseconds()) / 1000.0
	_ = httpLog.LogTx(notifyUrl, "POST", string(jsonNotif), resp, startTime)
	if err != nil {
		log.Error(err)
Kevin Di Lallo
committed
		met.ObserveNotification(sandboxName, serviceName, notifAssocSta, notifyUrl, nil, duration)
Kevin Di Lallo
committed
	met.ObserveNotification(sandboxName, serviceName, notifAssocSta, notifyUrl, resp, duration)
Simon Pastor
committed
func sendStaDataRateNotification(notifyUrl string, notification StaDataRateNotification) {
	startTime := time.Now()
	jsonNotif, err := json.Marshal(notification)
	if err != nil {
		log.Error(err.Error())
	}
	resp, err := http.Post(notifyUrl, "application/json", bytes.NewBuffer(jsonNotif))
	duration := float64(time.Since(startTime).Microseconds()) / 1000.0
	_ = httpLog.LogTx(notifyUrl, "POST", string(jsonNotif), resp, startTime)
	if err != nil {
		log.Error(err)
		met.ObserveNotification(sandboxName, serviceName, notifStaDataRate, notifyUrl, nil, duration)
		return
	}
	met.ObserveNotification(sandboxName, serviceName, notifStaDataRate, notifyUrl, resp, duration)
	defer resp.Body.Close()
}
func sendExpiryNotification(notifyUrl string, notification ExpiryNotification) {
	startTime := time.Now()
	jsonNotif, err := json.Marshal(notification)
	if err != nil {
		log.Error(err.Error())
	}
	resp, err := http.Post(notifyUrl, "application/json", bytes.NewBuffer(jsonNotif))
Kevin Di Lallo
committed
	duration := float64(time.Since(startTime).Microseconds()) / 1000.0
	_ = httpLog.LogTx(notifyUrl, "POST", string(jsonNotif), resp, startTime)
	if err != nil {
		log.Error(err)
Kevin Di Lallo
committed
		met.ObserveNotification(sandboxName, serviceName, notifExpiry, notifyUrl, nil, duration)
Kevin Di Lallo
committed
	met.ObserveNotification(sandboxName, serviceName, notifExpiry, notifyUrl, resp, duration)
}
func subscriptionsGET(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	vars := mux.Vars(r)
	subIdParamStr := vars["subscriptionId"]
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
	jsonRespDB, _ := rc.JSONGetEntry(baseKey+"subscriptions:"+subIdParamStr, ".")
	if jsonRespDB == "" {
		w.WriteHeader(http.StatusNotFound)
		return
	}
	var subscriptionCommon SubscriptionCommon
	err := json.Unmarshal([]byte(jsonRespDB), &subscriptionCommon)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	var jsonResponse []byte
	switch subscriptionCommon.SubscriptionType {
	case ASSOC_STA_SUBSCRIPTION:
		var subscription AssocStaSubscription
		err = json.Unmarshal([]byte(jsonRespDB), &subscription)
		if err != nil {
			log.Error(err.Error())
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}
Simon Pastor
committed
		jsonResponse, err = json.Marshal(subscription)
	case STA_DATA_RATE_SUBSCRIPTION:
		var subscription StaDataRateSubscription
		err = json.Unmarshal([]byte(jsonRespDB), &subscription)
		if err != nil {
			log.Error(err.Error())
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}
		jsonResponse, err = json.Marshal(subscription)
	default:
		log.Error("Unknown subscription type")
		w.WriteHeader(http.StatusBadRequest)
		return
	}
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	w.WriteHeader(http.StatusOK)
	fmt.Fprintf(w, string(jsonResponse))
}
func isSubscriptionIdRegisteredAssocSta(subsIdStr string) bool {
	if assocStaSubscriptionMap[subsId] != nil {
		return true
	} else {
		return false
	}
}
Simon Pastor
committed
func isSubscriptionIdRegisteredStaDataRate(subsIdStr string) bool {
	subsId, _ := strconv.Atoi(subsIdStr)
	mutex.Lock()
	defer mutex.Unlock()
	if staDataRateSubscriptionMap[subsId] != nil {
		return true
	} else {
		return false
	}
}
func registerAssocSta(subscription *AssocStaSubscription, subsIdStr string) {
	assocStaSubscriptionMap[subsId] = subscription
	if subscription.ExpiryDeadline != nil {
		//get current list of subscription meant to expire at this time
		intList := subscriptionExpiryMap[int(subscription.ExpiryDeadline.Seconds)]
		intList = append(intList, subsId)
		subscriptionExpiryMap[int(subscription.ExpiryDeadline.Seconds)] = intList
	}
	log.Info("New registration: ", subsId, " type: ", subscription.SubscriptionType)
Simon Pastor
committed
	if subscription.RequestTestNotification {
		sendTestNotification(subscription.CallbackReference, subscription.Links.Self)
	}
}
func registerStaDataRate(subscription *StaDataRateSubscription, subsIdStr string) {
	subsId, _ := strconv.Atoi(subsIdStr)
	mutex.Lock()
	defer mutex.Unlock()
	staDataRateSubscriptionMap[subsId] = subscription
	if subscription.ExpiryDeadline != nil {
		//get current list of subscription meant to expire at this time
		intList := subscriptionExpiryMap[int(subscription.ExpiryDeadline.Seconds)]
		intList = append(intList, subsId)
		subscriptionExpiryMap[int(subscription.ExpiryDeadline.Seconds)] = intList
	}
	log.Info("New registration: ", subsId, " type: ", subscription.SubscriptionType)
	if subscription.RequestTestNotification {
		sendTestNotification(subscription.CallbackReference, subscription.Links.Self)
	}
func deregisterAssocSta(subsIdStr string, mutexTaken bool) {
	if !mutexTaken {
		mutex.Lock()
		defer mutex.Unlock()
	}
	assocStaSubscriptionMap[subsId] = nil
	log.Info("Deregistration: ", subsId, " type: ", assocStaSubscriptionType)
}
Simon Pastor
committed
func deregisterStaDataRate(subsIdStr string, mutexTaken bool) {
	subsId, _ := strconv.Atoi(subsIdStr)
	if !mutexTaken {
		mutex.Lock()
		defer mutex.Unlock()
	}
	staDataRateSubscriptionMap[subsId] = nil
	log.Info("Deregistration: ", subsId, " type: ", staDataRateSubscriptionType)
}
func subscriptionsPOST(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	var subscriptionCommon SubscriptionCommon
	bodyBytes, _ := ioutil.ReadAll(r.Body)
	err := json.Unmarshal(bodyBytes, &subscriptionCommon)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	//extract common body part
	subscriptionType := subscriptionCommon.SubscriptionType
	//mandatory parameter
	if subscriptionCommon.CallbackReference == "" {
		log.Error("Mandatory CallbackReference parameter not present")
		http.Error(w, "Mandatory CallbackReference parameter not present", http.StatusBadRequest)
		return
	}
	newSubsId := nextSubscriptionIdAvailable
	nextSubscriptionIdAvailable++
	subsIdStr := strconv.Itoa(newSubsId)
	link := new(AssocStaSubscriptionLinks)
	self := new(LinkType)
	self.Href = hostUrl.String() + basePath + "subscriptions/" + subsIdStr
	link.Self = self
	var jsonResponse []byte
	switch subscriptionType {
	case ASSOC_STA_SUBSCRIPTION:
		var subscription AssocStaSubscription
		err = json.Unmarshal(bodyBytes, &subscription)
		if err != nil {
			log.Error(err.Error())
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}
		subscription.Links = link
		//registration
		_ = rc.JSONSetEntry(baseKey+"subscriptions:"+subsIdStr, ".", convertAssocStaSubscriptionToJson(&subscription))
		registerAssocSta(&subscription, subsIdStr)
Simon Pastor
committed
		jsonResponse, err = json.Marshal(subscription)
	case STA_DATA_RATE_SUBSCRIPTION:
		var subscription StaDataRateSubscription
		err = json.Unmarshal(bodyBytes, &subscription)
		if err != nil {
			log.Error(err.Error())
			http.Error(w, err.Error(), http.StatusInternalServerError)
			return
		}
		subscription.Links = link
		//registration
		_ = rc.JSONSetEntry(baseKey+"subscriptions:"+subsIdStr, ".", convertStaDataRateSubscriptionToJson(&subscription))
		registerStaDataRate(&subscription, subsIdStr)
	default:
		nextSubscriptionIdAvailable--
		w.WriteHeader(http.StatusBadRequest)
		return
	}
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	w.WriteHeader(http.StatusCreated)
	fmt.Fprintf(w, string(jsonResponse))
}
func subscriptionsPUT(w http.ResponseWriter, r *http.Request) {
	w.Header().Set("Content-Type", "application/json; charset=UTF-8")
	vars := mux.Vars(r)
	subIdParamStr := vars["subscriptionId"]
	var subscriptionCommon SubscriptionCommon
	bodyBytes, _ := ioutil.ReadAll(r.Body)
	err := json.Unmarshal(bodyBytes, &subscriptionCommon)
	if err != nil {
		log.Error(err.Error())
		http.Error(w, err.Error(), http.StatusInternalServerError)
		return
	}
	//extract common body part
	subscriptionType := subscriptionCommon.SubscriptionType