Newer
Older
/*
* Copyright (c) 2020 InterDigital Communications, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package server
import (
"net/http"
"net/url"
"os"
"reflect"
"strconv"
"strings"
"time"
sbi "github.com/InterDigitalInc/AdvantEDGE/go-apps/meep-wais/sbi"
dkm "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-data-key-mgr"
httpLog "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-http-logger"
log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
Kevin Di Lallo
committed
met "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-metrics"
redis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis"
"github.com/gorilla/mux"
)
const waisBasePath = "/wai/v2/"
Kevin Di Lallo
committed
const waisKey = "wais:"
const logModuleWAIS = "meep-wais"
const serviceName = "WAI Service"
const (
Simon Pastor
committed
notifAssocSta = "AssocStaNotification"
notifStaDataRate = "StaDataRateNotification"
notifExpiry = "ExpiryNotification"
notifTest = "TestNotification"
Kevin Di Lallo
committed
)
var redisAddr string = "meep-redis-master.default.svc.cluster.local:6379"
var influxAddr string = "http://meep-influxdb.default.svc.cluster.local:8086"
const assocStaSubscriptionType = "AssocStaSubscription"
Simon Pastor
committed
const staDataRateSubscriptionType = "StaDataRateSubscription"
Simon Pastor
committed
const STA_DATA_RATE_SUBSCRIPTION = "StaDataRateSubscription"
Simon Pastor
committed
const STA_DATA_RATE_NOTIFICATION = "StaDataRateNotification"
const TEST_NOTIFICATION = "TestNotification"
var assocStaSubscriptionInfoMap = map[int]*AssocStaSubscriptionInfo{}
var staDataRateSubscriptionInfoMap = map[int]*StaDataRateSubscriptionInfo{}
Simon Pastor
committed
var subscriptionExpiryMap = map[int][]int{}
var WAIS_DB = 5
var rc *redis.Connector
var hostUrl *url.URL
var sandboxName string
var basePath string
var baseKey string
var expiryTicker *time.Ticker
var nextSubscriptionIdAvailable int
type ApInfoComplete struct {
ApId ApIdentity
ApLocation ApLocation
StaMacIds []string
}
type ApInfoCompleteResp struct {
ApInfoCompleteList []ApInfoComplete
}
type StaDataRateSubscriptionInfo struct {
NextTts int32 //next time to send, derived from notificationPeriod
Subscription *StaDataRateSubscription
Triggered bool
}
type AssocStaSubscriptionInfo struct {
NextTts int32 //next time to send, derived from notificationPeriod
Subscription *AssocStaSubscription
Triggered bool
}
Simon Pastor
committed
type StaData struct {
Simon Pastor
committed
}
type StaInfoResp struct {
StaInfoList []StaInfo
}
type ApInfoResp struct {
ApInfoList []ApInfo
}
Simon Pastor
committed
func notImplemented(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusNotImplemented)
}
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
func Init() (err error) {
// Retrieve Sandbox name from environment variable
sandboxNameEnv := strings.TrimSpace(os.Getenv("MEEP_SANDBOX_NAME"))
if sandboxNameEnv != "" {
sandboxName = sandboxNameEnv
}
if sandboxName == "" {
err = errors.New("MEEP_SANDBOX_NAME env variable not set")
log.Error(err.Error())
return err
}
log.Info("MEEP_SANDBOX_NAME: ", sandboxName)
// hostUrl is the url of the node serving the resourceURL
// Retrieve public url address where service is reachable, if not present, use Host URL environment variable
hostUrl, err = url.Parse(strings.TrimSpace(os.Getenv("MEEP_PUBLIC_URL")))
if err != nil || hostUrl == nil || hostUrl.String() == "" {
hostUrl, err = url.Parse(strings.TrimSpace(os.Getenv("MEEP_HOST_URL")))
if err != nil {
hostUrl = new(url.URL)
}
}
log.Info("resource URL: ", hostUrl)
// Set base path
basePath = "/" + sandboxName + waisBasePath
// Get base store key
baseKey = dkm.GetKeyRoot(sandboxName) + waisKey
// Connect to Redis DB
rc, err = redis.NewConnector(redisAddr, WAIS_DB)
if err != nil {
log.Error("Failed connection to Redis DB. Error: ", err)
return err
}
_ = rc.DBFlush(baseKey)
log.Info("Connected to Redis DB, RNI service table")
reInit()
expiryTicker = time.NewTicker(time.Second)
go func() {
for range expiryTicker.C {
checkForExpiredSubscriptions()
checkAssocStaPeriodTrigger()
checkStaDataRatePeriodTrigger()
}
}()
// Initialize SBI
sbiCfg := sbi.SbiCfg{
SandboxName: sandboxName,
RedisAddr: redisAddr,
StaInfoCb: updateStaInfo,
ApInfoCb: updateApInfo,
ScenarioNameCb: updateStoreName,
CleanUpCb: cleanUp,
}
err = sbi.Init(sbiCfg)
if err != nil {
log.Error("Failed initialize SBI. Error: ", err)
return err
}
log.Info("SBI Initialized")
return nil
}
// reInit - finds the value already in the DB to repopulate local stored info
func reInit() {
//next available subsId will be overrriden if subscriptions already existed
nextSubscriptionIdAvailable = 1
keyName := baseKey + "subscription:" + "*"
_ = rc.ForEachJSONEntry(keyName, repopulateAssocStaSubscriptionMap, nil)
Simon Pastor
committed
_ = rc.ForEachJSONEntry(keyName, repopulateStaDataRateSubscriptionMap, nil)
}
// Run - Start WAIS
func Run() (err error) {
return sbi.Run()
}
// Stop - Stop WAIS
func Stop() (err error) {
return sbi.Stop()
}
func updateStaInfo(name string, ownMacId string, apMacId string, rssi *int32, sumUl *int32, sumDl *int32) {
// Get STA Info from DB, if any
Simon Pastor
committed
var staData *StaData
jsonStaData, _ := rc.JSONGetEntry(baseKey+"UE:"+name, ".")
if jsonStaData != "" {
staData = convertJsonToStaData(jsonStaData)
}
var dataRate StaDataRate
if sumDl != nil {
dataRate.StaLastDataDownlinkRate = *sumDl //kbps
}
if sumUl != nil {
dataRate.StaLastDataUplinkRate = *sumUl //kbps
}
// Update DB if STA Info does not exist or has changed
Simon Pastor
committed
if staData == nil || isStaInfoUpdateRequired(staData.StaInfo, ownMacId, apMacId, rssi, &dataRate) {
// Set STA Mac ID
Simon Pastor
committed
if staData == nil {
staData = new(StaData)
staData.StaInfo = new(StaInfo)
staData.StaInfo.StaId = new(StaIdentity)
Simon Pastor
committed
staData.StaInfo.StaId.MacId = ownMacId
// Set Associated AP, if any
if apMacId == "" {
Simon Pastor
committed
staData.StaInfo.ApAssociated = nil
Simon Pastor
committed
if staData.StaInfo.ApAssociated == nil {
staData.StaInfo.ApAssociated = new(ApAssociated)
Simon Pastor
committed
staData.StaInfo.ApAssociated.Bssid = apMacId
}
// Set RSSI
if rssi != nil {
var rssiObj Rssi
rssiObj.Rssi = *rssi
Simon Pastor
committed
staData.StaInfo.Rssi = &rssiObj
Simon Pastor
committed
staData.StaInfo.Rssi = nil
Simon Pastor
committed
dataRate.StaId = staData.StaInfo.StaId
staData.StaInfo.StaDataRate = &dataRate
_ = rc.JSONSetEntry(baseKey+"UE:"+name, ".", convertStaDataToJson(staData))
Simon Pastor
committed
checkStaDataRateNotificationRegisteredSubscriptions(staData.StaInfo.StaId, dataRate.StaLastDataDownlinkRate, dataRate.StaLastDataUplinkRate, false, true)
Simon Pastor
committed
Simon Pastor
committed
func isStaInfoUpdateRequired(staInfo *StaInfo, ownMacId string, apMacId string, rssi *int32, dataRate *StaDataRate) bool {
// Check if STA Info exists
if staInfo == nil {
return true
}
// Compare STA Mac
if ownMacId != staInfo.StaId.MacId {
return true
}
// Compare AP Mac
if (apMacId == "" && staInfo.ApAssociated != nil) ||
Simon Pastor
committed
(apMacId != "" && (staInfo.ApAssociated == nil || apMacId != staInfo.ApAssociated.Bssid)) {
if (rssi == nil && staInfo.Rssi != nil) ||
(rssi != nil && staInfo.Rssi == nil) ||
(rssi != nil && staInfo.Rssi != nil && *rssi != staInfo.Rssi.Rssi) {
return true
}
Simon Pastor
committed
if dataRate.StaLastDataDownlinkRate != staInfo.StaDataRate.StaLastDataDownlinkRate || dataRate.StaLastDataUplinkRate != staInfo.StaDataRate.StaLastDataUplinkRate {
return true
}
str := fmt.Sprintf("%f", *value)
strArray := strings.Split(str, ".")
integerPart, err := strconv.Atoi(strArray[0])
if err != nil {
log.Error("Can't convert float to int")
return 0
}
fractionPart, err := strconv.Atoi(strArray[1])
if err != nil {
log.Error("Can't convert float to int")
return 0
}
//9 first bits are the integer part, last 23 bits are fraction part
valueToReturn := (integerPart << 23) + fractionPart
return int32(valueToReturn)
}
func isUpdateApInfoNeeded(jsonApInfoComplete string, newLong int32, newLat int32, staMacIds []string) bool {
var oldStaMacIds []string
var oldLat int32 = 0
var oldLong int32 = 0
if jsonApInfoComplete == "" {
return true
} else {
apInfoComplete := convertJsonToApInfoComplete(jsonApInfoComplete)
oldStaMacIds = apInfoComplete.StaMacIds
if apInfoComplete.ApLocation.Geolocation != nil {
oldLat = int32(apInfoComplete.ApLocation.Geolocation.Lat)
oldLong = int32(apInfoComplete.ApLocation.Geolocation.Long)
//if AP moved
if oldLat != newLat || oldLong != newLong {
return true
}
//if number of STAs connected changes
if len(oldStaMacIds) != len(staMacIds) {
return true
}
//if the list of connected STAs is different
return !reflect.DeepEqual(oldStaMacIds, staMacIds)
}
func updateApInfo(name string, apMacId string, longitude *float32, latitude *float32, staMacIds []string) {
//get from DB
jsonApInfoComplete, _ := rc.JSONGetEntry(baseKey+"AP:"+name, ".")
newLat := convertFloatToGeolocationFormat(latitude)
newLong := convertFloatToGeolocationFormat(longitude)
if isUpdateApInfoNeeded(jsonApInfoComplete, newLong, newLat, staMacIds) {
//updateDB
var apInfoComplete ApInfoComplete
var apLocation ApLocation
var geoLocation GeoLocation
var apId ApIdentity
Simon Pastor
committed
geoLocation.Lat = int32(newLat)
geoLocation.Long = int32(newLong)
Simon Pastor
committed
apId.Bssid = apMacId
apInfoComplete.ApId = apId
_ = rc.JSONSetEntry(baseKey+"AP:"+name, ".", convertApInfoCompleteToJson(&apInfoComplete))
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
checkAssocStaNotificationRegisteredSubscriptions(staMacIds, apMacId, false, true)
}
}
func checkAssocStaPeriodTrigger() {
//loop through every subscriptions, lower period by one and invoke the notification if a check is warranted
mutex.Lock()
defer mutex.Unlock()
if len(assocStaSubscriptionInfoMap) < 1 {
return
}
//decrease all subscriptions
//check all that applies
for _, subInfo := range assocStaSubscriptionInfoMap {
if subInfo != nil {
subInfo.NextTts--
//if no periodic check is needed, value is negative
if subInfo.NextTts != 0 {
continue
} else { //restart the nextTts and continue processing to send notification or not
subInfo.NextTts = subInfo.Subscription.NotificationPeriod
}
}
}
//find every AP info and reuse a function to store them all
var apInfoCompleteResp ApInfoCompleteResp
apInfoCompleteResp.ApInfoCompleteList = make([]ApInfoComplete, 0)
keyName := baseKey + "AP:*"
err := rc.ForEachJSONEntry(keyName, populateApInfoCompleteList, &apInfoCompleteResp)
if err != nil {
log.Error(err.Error())
return
}
//loop through the response for each AP and check subscription with no need for mutex (already used)
for _, apInfoComplete := range apInfoCompleteResp.ApInfoCompleteList {
checkAssocStaNotificationRegisteredSubscriptions(apInfoComplete.StaMacIds, apInfoComplete.ApId.Bssid, true, false)
}
}
func checkStaDataRatePeriodTrigger() {
//loop through every subscriptions, lower period by one and invoke the notification if a check is warranted
mutex.Lock()
defer mutex.Unlock()
if len(staDataRateSubscriptionInfoMap) < 1 {
return
}
//decrease all subscriptions
//check all that applies
for _, subInfo := range staDataRateSubscriptionInfoMap {
if subInfo != nil {
subInfo.NextTts--
//if no periodic check is needed, value is negative
if subInfo.NextTts != 0 {
continue
} else { //restart the nextTts and continue processing to send notification or not
subInfo.NextTts = subInfo.Subscription.NotificationPeriod
}
}
}
//find every AP info and reuse a function to store them all
var staInfoResp StaInfoResp
staInfoResp.StaInfoList = make([]StaInfo, 0)
keyName := baseKey + "UE:*"
err := rc.ForEachJSONEntry(keyName, populateStaData, &staInfoResp)
if err != nil {
log.Error(err.Error())
return
}
//loop through the response for each AP and check subscription with no need for mutex (already used)
for _, staInfo := range staInfoResp.StaInfoList {
dataRate := staInfo.StaDataRate
checkStaDataRateNotificationRegisteredSubscriptions(dataRate.StaId, dataRate.StaLastDataDownlinkRate, dataRate.StaLastDataDownlinkRate, true, false)
}
}
func checkForExpiredSubscriptions() {
nowTime := int(time.Now().Unix())
for expiryTime, subsIndexList := range subscriptionExpiryMap {
if expiryTime <= nowTime {
subscriptionExpiryMap[expiryTime] = nil
for _, subsId := range subsIndexList {
if assocStaSubscriptionInfoMap[subsId] != nil {
Simon Pastor
committed
var expiryTimeStamp TimeStamp
expiryTimeStamp.Seconds = int32(expiryTime)
link := new(ExpiryNotificationLinks)
linkType := new(LinkType)
linkType.Href = assocStaSubscriptionInfoMap[subsId].Subscription.CallbackReference
Simon Pastor
committed
link.Subscription = linkType
notif.Links = link
notif.ExpiryDeadline = &expiryTimeStamp
sendExpiryNotification(link.Subscription.Href, notif)
_ = delSubscription(baseKey+"subscriptions", subsIdStr, true)
}
if staDataRateSubscriptionInfoMap[subsId] != nil {
Simon Pastor
committed
subsIdStr := strconv.Itoa(subsId)
var notif ExpiryNotification
expiryTimeStamp.Seconds = int32(expiryTime)
Simon Pastor
committed
linkType := new(LinkType)
linkType.Href = staDataRateSubscriptionInfoMap[subsId].Subscription.CallbackReference
Simon Pastor
committed
link.Subscription = linkType
notif.Links = link
notif.ExpiryDeadline = &expiryTimeStamp
Simon Pastor
committed
sendExpiryNotification(link.Subscription.Href, notif)
_ = delSubscription(baseKey+"subscriptions", subsIdStr, true)
Simon Pastor
committed
func repopulateAssocStaSubscriptionMap(key string, jsonInfo string, userData interface{}) error {
// Format response
err := json.Unmarshal([]byte(jsonInfo), &subscription)
if err != nil {
return err
}
selfUrl := strings.Split(subscription.Links.Self.Href, "/")
subsIdStr := selfUrl[len(selfUrl)-1]
subsId, _ := strconv.Atoi(subsIdStr)
assocStaSubscriptionInfoMap[subsId].Subscription = &subscription
assocStaSubscriptionInfoMap[subsId].NextTts = subscription.NotificationPeriod
if subscription.ExpiryDeadline != nil {
intList := subscriptionExpiryMap[int(subscription.ExpiryDeadline.Seconds)]
intList = append(intList, subsId)
subscriptionExpiryMap[int(subscription.ExpiryDeadline.Seconds)] = intList
}
//reinitialisation of next available Id for future subscription request
if subsId >= nextSubscriptionIdAvailable {
nextSubscriptionIdAvailable = subsId + 1
}
return nil
}
Simon Pastor
committed
func repopulateStaDataRateSubscriptionMap(key string, jsonInfo string, userData interface{}) error {
var subscription StaDataRateSubscription
// Format response
err := json.Unmarshal([]byte(jsonInfo), &subscription)
if err != nil {
return err
}
selfUrl := strings.Split(subscription.Links.Self.Href, "/")
subsIdStr := selfUrl[len(selfUrl)-1]
subsId, _ := strconv.Atoi(subsIdStr)
mutex.Lock()
defer mutex.Unlock()
staDataRateSubscriptionInfoMap[subsId].Subscription = &subscription
staDataRateSubscriptionInfoMap[subsId].NextTts = subscription.NotificationPeriod
Simon Pastor
committed
if subscription.ExpiryDeadline != nil {
intList := subscriptionExpiryMap[int(subscription.ExpiryDeadline.Seconds)]
intList = append(intList, subsId)
subscriptionExpiryMap[int(subscription.ExpiryDeadline.Seconds)] = intList
}
//reinitialisation of next available Id for future subscription request
if subsId >= nextSubscriptionIdAvailable {
nextSubscriptionIdAvailable = subsId + 1
}
return nil
}
func checkAssocStaNotificationRegisteredSubscriptions(staMacIds []string, apMacId string, isPeriodicInvoked bool, needMutex bool) {
if needMutex {
mutex.Lock()
defer mutex.Unlock()
}
for subsId, subInfo := range assocStaSubscriptionInfoMap {
if subInfo == nil {
break
}
sub := subInfo.Subscription
//if notification is triggered by events but that periodic notification is registered, skip
Simon Pastor
committed
if sub.NotificationPeriod != 0 && !isPeriodicInvoked {
continue
}
//if notification is periodic, only trigger on period expiry (period expired when notidication period is maxed out)
if sub.NotificationPeriod != subInfo.NextTts && isPeriodicInvoked {
continue
}
Simon Pastor
committed
if sub.ApId.Bssid == apMacId {
Simon Pastor
committed
if match {
if sub.NotificationEvent != nil {
match = false
switch sub.NotificationEvent.Trigger {
case "1":
if len(staMacIds) >= int(sub.NotificationEvent.Threshold) {
match = true
}
case "2":
if len(staMacIds) <= int(sub.NotificationEvent.Threshold) {
match = true
}
default:
}
//if the notification already triggered, do not send it again unless its a periodic event
if match {
if !isPeriodicInvoked {
if !subInfo.Triggered {
assocStaSubscriptionInfoMap[subsId].Triggered = true
} else {
match = false
}
}
} else {
assocStaSubscriptionInfoMap[subsId].Triggered = false
}
Simon Pastor
committed
}
}
if match {
subsIdStr := strconv.Itoa(subsId)
log.Info("Sending WAIS notification ", sub.CallbackReference)
Simon Pastor
committed
apId.Bssid = apMacId
notif.ApId = &apId
for _, staMacId := range staMacIds {
staId.MacId = staMacId
notif.StaId = append(notif.StaId, staId)
}
log.Info("Assoc Sta Notification" + "(" + subsIdStr + ")")
}
}
}
}
func checkStaDataRateNotificationRegisteredSubscriptions(staId *StaIdentity, dataRateDl int32, dataRateUl int32, isPeriodicInvoked bool, needMutex bool) {
Simon Pastor
committed
if needMutex {
mutex.Lock()
defer mutex.Unlock()
}
Simon Pastor
committed
//check all that applies
for subsId, subInfo := range staDataRateSubscriptionInfoMap {
log.Info("SIMON check")
if subInfo == nil {
break
}
sub := subInfo.Subscription
Simon Pastor
committed
match := false
if sub != nil {
//if notification is triggered by events but that periodic notification is registered, skip
Simon Pastor
committed
if sub.NotificationPeriod != 0 && !isPeriodicInvoked {
continue
}
//if notification is periodic, only trigger on period expiry (period expired when notidication period is maxed out)
if sub.NotificationPeriod != subInfo.NextTts && isPeriodicInvoked {
continue
}
Simon Pastor
committed
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
notifToSend := false
var staDataRateList []StaDataRate
for _, subStaId := range sub.StaId {
//check to match every values and at least one when its an array
if staId.MacId != subStaId.MacId {
continue
}
if staId.Aid != subStaId.Aid {
continue
}
match = true
for _, ssid := range subStaId.Ssid {
match = false
//can only have one ssid at a time
if ssid == staId.Ssid[0] {
match = true
break
}
}
if match {
for _, ipAddress := range subStaId.IpAddress {
match = false
//can only have one ip address
if ipAddress == staId.IpAddress[0] {
match = true
break
}
}
}
if match {
if sub.NotificationEvent != nil {
log.Info("SIMON event DL ", dataRateDl, "---", sub.NotificationEvent.DownlinkRateThreshold)
log.Info("SIMON event UL ", dataRateUl, "---", sub.NotificationEvent.UplinkRateThreshold)
Simon Pastor
committed
match = false
switch sub.NotificationEvent.Trigger {
case "1":
if dataRateDl >= sub.NotificationEvent.DownlinkRateThreshold {
match = true
}
case "2":
if dataRateDl <= sub.NotificationEvent.DownlinkRateThreshold {
match = true
}
case "3":
if dataRateUl >= sub.NotificationEvent.UplinkRateThreshold {
match = true
}
case "4":
if dataRateUl <= sub.NotificationEvent.UplinkRateThreshold {
Simon Pastor
committed
match = true
}
case "5":
if dataRateDl >= sub.NotificationEvent.DownlinkRateThreshold && dataRateUl >= sub.NotificationEvent.UplinkRateThreshold {
match = true
}
case "6":
if dataRateDl <= sub.NotificationEvent.DownlinkRateThreshold && dataRateUl <= sub.NotificationEvent.UplinkRateThreshold {
match = true
}
case "7":
if dataRateDl >= sub.NotificationEvent.DownlinkRateThreshold || dataRateUl >= sub.NotificationEvent.UplinkRateThreshold {
match = true
}
case "8":
if dataRateDl <= sub.NotificationEvent.DownlinkRateThreshold || dataRateUl <= sub.NotificationEvent.UplinkRateThreshold {
match = true
}
default:
}
//if the notification already triggered, do not send it again unless its a periodic event
if match {
log.Info("SIMON need to send")
if !isPeriodicInvoked {
if !subInfo.Triggered {
staDataRateSubscriptionInfoMap[subsId].Triggered = true
} else {
match = false
}
}
} else {
log.Info("SIMON reset trigger")
staDataRateSubscriptionInfoMap[subsId].Triggered = false
}
Simon Pastor
committed
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
}
}
if match {
var staDataRate StaDataRate
staDataRate.StaId = staId
staDataRate.StaLastDataDownlinkRate = dataRateDl
staDataRate.StaLastDataUplinkRate = dataRateUl
staDataRateList = append(staDataRateList, staDataRate)
notifToSend = true
}
}
if notifToSend {
subsIdStr := strconv.Itoa(subsId)
log.Info("Sending WAIS notification ", sub.CallbackReference)
var notif StaDataRateNotification
seconds := time.Now().Unix()
var timeStamp TimeStamp
timeStamp.Seconds = int32(seconds)
notif.TimeStamp = &timeStamp
notif.NotificationType = STA_DATA_RATE_NOTIFICATION
notif.StaDataRate = staDataRateList
sendStaDataRateNotification(sub.CallbackReference, notif)
log.Info("Sta Data Rate Notification" + "(" + subsIdStr + ")")
}
}
}
}
func sendTestNotification(notifyUrl string, linkType *LinkType) {
var notification TestNotification
notification.NotificationType = TEST_NOTIFICATION
link := new(ExpiryNotificationLinks)
link.Subscription = linkType
notification.Links = link
startTime := time.Now()
jsonNotif, err := json.Marshal(notification)
if err != nil {
log.Error(err.Error())
}
resp, err := http.Post(notifyUrl, "application/json", bytes.NewBuffer(jsonNotif))
duration := float64(time.Since(startTime).Microseconds()) / 1000.0
_ = httpLog.LogTx(notifyUrl, "POST", string(jsonNotif), resp, startTime)
if err != nil {
log.Error(err)
met.ObserveNotification(sandboxName, serviceName, notifTest, notifyUrl, nil, duration)
return
}
met.ObserveNotification(sandboxName, serviceName, notifTest, notifyUrl, resp, duration)
defer resp.Body.Close()
}
func sendAssocStaNotification(notifyUrl string, notification AssocStaNotification) {
jsonNotif, err := json.Marshal(notification)
if err != nil {
log.Error(err.Error())
}
resp, err := http.Post(notifyUrl, "application/json", bytes.NewBuffer(jsonNotif))
Kevin Di Lallo
committed
duration := float64(time.Since(startTime).Microseconds()) / 1000.0
_ = httpLog.LogTx(notifyUrl, "POST", string(jsonNotif), resp, startTime)
if err != nil {
log.Error(err)
Kevin Di Lallo
committed
met.ObserveNotification(sandboxName, serviceName, notifAssocSta, notifyUrl, nil, duration)
Kevin Di Lallo
committed
met.ObserveNotification(sandboxName, serviceName, notifAssocSta, notifyUrl, resp, duration)
Simon Pastor
committed
func sendStaDataRateNotification(notifyUrl string, notification StaDataRateNotification) {
startTime := time.Now()
jsonNotif, err := json.Marshal(notification)
if err != nil {
log.Error(err.Error())
}
resp, err := http.Post(notifyUrl, "application/json", bytes.NewBuffer(jsonNotif))
duration := float64(time.Since(startTime).Microseconds()) / 1000.0
_ = httpLog.LogTx(notifyUrl, "POST", string(jsonNotif), resp, startTime)
if err != nil {
log.Error(err)
met.ObserveNotification(sandboxName, serviceName, notifStaDataRate, notifyUrl, nil, duration)
return
}
met.ObserveNotification(sandboxName, serviceName, notifStaDataRate, notifyUrl, resp, duration)
defer resp.Body.Close()
}
func sendExpiryNotification(notifyUrl string, notification ExpiryNotification) {
startTime := time.Now()
jsonNotif, err := json.Marshal(notification)
if err != nil {
log.Error(err.Error())
}
resp, err := http.Post(notifyUrl, "application/json", bytes.NewBuffer(jsonNotif))
Kevin Di Lallo
committed
duration := float64(time.Since(startTime).Microseconds()) / 1000.0
_ = httpLog.LogTx(notifyUrl, "POST", string(jsonNotif), resp, startTime)
if err != nil {
log.Error(err)
Kevin Di Lallo
committed
met.ObserveNotification(sandboxName, serviceName, notifExpiry, notifyUrl, nil, duration)
Kevin Di Lallo
committed
met.ObserveNotification(sandboxName, serviceName, notifExpiry, notifyUrl, resp, duration)
}
func subscriptionsGET(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
vars := mux.Vars(r)
subIdParamStr := vars["subscriptionId"]
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
jsonRespDB, _ := rc.JSONGetEntry(baseKey+"subscriptions:"+subIdParamStr, ".")
if jsonRespDB == "" {
w.WriteHeader(http.StatusNotFound)
return
}
var subscriptionCommon SubscriptionCommon
err := json.Unmarshal([]byte(jsonRespDB), &subscriptionCommon)
if err != nil {
log.Error(err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
var jsonResponse []byte
switch subscriptionCommon.SubscriptionType {
case ASSOC_STA_SUBSCRIPTION:
var subscription AssocStaSubscription
err = json.Unmarshal([]byte(jsonRespDB), &subscription)
if err != nil {
log.Error(err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
Simon Pastor
committed
jsonResponse, err = json.Marshal(subscription)
case STA_DATA_RATE_SUBSCRIPTION:
var subscription StaDataRateSubscription
err = json.Unmarshal([]byte(jsonRespDB), &subscription)
if err != nil {
log.Error(err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
jsonResponse, err = json.Marshal(subscription)
default:
log.Error("Unknown subscription type")
w.WriteHeader(http.StatusBadRequest)
return
}
if err != nil {
log.Error(err.Error())
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
w.WriteHeader(http.StatusOK)
fmt.Fprintf(w, string(jsonResponse))
}
func isSubscriptionIdRegisteredAssocSta(subsIdStr string) bool {
if assocStaSubscriptionInfoMap[subsId] != nil {
return true
} else {
return false
}
}
Simon Pastor
committed
func isSubscriptionIdRegisteredStaDataRate(subsIdStr string) bool {
subsId, _ := strconv.Atoi(subsIdStr)
mutex.Lock()
defer mutex.Unlock()
if staDataRateSubscriptionInfoMap[subsId] != nil {
Simon Pastor
committed
return true
} else {
return false
}
}
func registerAssocSta(subscription *AssocStaSubscription, subsIdStr string) {
assocStaSubscriptionInfo := AssocStaSubscriptionInfo{subscription.NotificationPeriod, subscription, false}
assocStaSubscriptionInfoMap[subsId] = &assocStaSubscriptionInfo
if subscription.ExpiryDeadline != nil {
//get current list of subscription meant to expire at this time
intList := subscriptionExpiryMap[int(subscription.ExpiryDeadline.Seconds)]
intList = append(intList, subsId)
subscriptionExpiryMap[int(subscription.ExpiryDeadline.Seconds)] = intList
}
log.Info("New registration: ", subsId, " type: ", subscription.SubscriptionType)
Simon Pastor
committed
if subscription.RequestTestNotification {
sendTestNotification(subscription.CallbackReference, subscription.Links.Self)
}
}
func registerStaDataRate(subscription *StaDataRateSubscription, subsIdStr string) {
subsId, _ := strconv.Atoi(subsIdStr)
mutex.Lock()
defer mutex.Unlock()