Newer
Older
/*
* Copyright (c) 2020 InterDigital Communications, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package server
import (
"net/http"
"net/url"
"os"
"reflect"
"strconv"
"strings"
"time"
sbi "github.com/InterDigitalInc/AdvantEDGE/go-apps/meep-wais/sbi"
appInfoClient "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-app-info-client"
appSupportClient "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-app-support-client"
dkm "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-data-key-mgr"
httpLog "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-http-logger"
log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
Kevin Di Lallo
committed
met "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-metrics"
redis "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis"
srvMgmtClient "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-service-mgmt-client"
const waisBasePath = "wai/v2/"
const waisKey = "wais"
Kevin Di Lallo
committed
const logModuleWAIS = "meep-wais"
const serviceName = "WAI Service"
const defaultMepName = "global"
const defaultScopeOfLocality = "MEC_SYSTEM"
const defaultConsumedLocalOnly = true
Kevin Di Lallo
committed
const (
Simon Pastor
committed
notifAssocSta = "AssocStaNotification"
notifStaDataRate = "StaDataRateNotification"
notifExpiry = "ExpiryNotification"
notifTest = "TestNotification"
Kevin Di Lallo
committed
)
var redisAddr string = "meep-redis-master.default.svc.cluster.local:6379"
var influxAddr string = "http://meep-influxdb.default.svc.cluster.local:8086"
const assocStaSubscriptionType = "AssocStaSubscription"
Simon Pastor
committed
const staDataRateSubscriptionType = "StaDataRateSubscription"
Simon Pastor
committed
const STA_DATA_RATE_SUBSCRIPTION = "StaDataRateSubscription"
Simon Pastor
committed
const STA_DATA_RATE_NOTIFICATION = "StaDataRateNotification"
const MEASUREMENT_REPORT_SUBSCRIPTION = "MeasurementReportSubscription"
Simon Pastor
committed
const TEST_NOTIFICATION = "TestNotification"
var assocStaSubscriptionInfoMap = map[int]*AssocStaSubscriptionInfo{}
var staDataRateSubscriptionInfoMap = map[int]*StaDataRateSubscriptionInfo{}
Simon Pastor
committed
var rc *redis.Connector
var hostUrl *url.URL
var sandboxName string
var mepName string = defaultMepName
var scopeOfLocality string = defaultScopeOfLocality
var consumedLocalOnly bool = defaultConsumedLocalOnly
var locality []string
var expiryTicker *time.Ticker
var nextSubscriptionIdAvailable int
type ApInfoComplete struct {
ApId ApIdentity
ApLocation ApLocation
StaMacIds []string
}
type ApInfoCompleteResp struct {
ApInfoCompleteList []ApInfoComplete
}
type StaDataRateSubscriptionInfo struct {
NextTts int32 //next time to send, derived from notificationPeriod
NotificationCheckReady bool
Subscription *StaDataRateSubscription
Triggered bool
}
type AssocStaSubscriptionInfo struct {
NextTts int32 //next time to send, derived from notificationPeriod
NotificationCheckReady bool
Subscription *AssocStaSubscription
Triggered bool
Simon Pastor
committed
type StaData struct {
Simon Pastor
committed
}
type StaInfoResp struct {
StaInfoList []StaInfo
}
type ApInfoResp struct {
ApInfoList []ApInfo
}
const serviceAppName = "WAI"
const serviceAppVersion = "2.1.1"
var serviceAppInstanceId string
var appEnablementUrl string
var appEnablementAppSupportClient *appSupportClient.APIClient
var appEnablementSrvMgmtClient *srvMgmtClient.APIClient
var appEnablementAppInfoClient *appInfoClient.APIClient
var registrationTicker *time.Ticker
Simon Pastor
committed
func notImplemented(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
w.WriteHeader(http.StatusNotImplemented)
}
func Init() (err error) {
// Retrieve Sandbox name from environment variable
sandboxNameEnv := strings.TrimSpace(os.Getenv("MEEP_SANDBOX_NAME"))
if sandboxNameEnv != "" {
sandboxName = sandboxNameEnv
}
if sandboxName == "" {
err = errors.New("MEEP_SANDBOX_NAME env variable not set")
log.Error(err.Error())
return err
}
log.Info("MEEP_SANDBOX_NAME: ", sandboxName)
// hostUrl is the url of the node serving the resourceURL
// Retrieve public url address where service is reachable, if not present, use Host URL environment variable
hostUrl, err = url.Parse(strings.TrimSpace(os.Getenv("MEEP_PUBLIC_URL")))
if err != nil || hostUrl == nil || hostUrl.String() == "" {
hostUrl, err = url.Parse(strings.TrimSpace(os.Getenv("MEEP_HOST_URL")))
if err != nil {
hostUrl = new(url.URL)
}
}
log.Info("MEEP_HOST_URL: ", hostUrl)
// Get MEP name
mepNameEnv := strings.TrimSpace(os.Getenv("MEEP_MEP_NAME"))
if mepNameEnv != "" {
mepName = mepNameEnv
}
log.Info("MEEP_MEP_NAME: ", mepName)
// Get App Enablement URL
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
appEnablementEnv := strings.TrimSpace(os.Getenv("MEEP_APP_ENABLEMENT"))
if appEnablementEnv != "" {
appEnablementUrl = "http://" + appEnablementEnv
appEnablementEnabled = true
}
log.Info("MEEP_APP_ENABLEMENT: ", appEnablementUrl)
// Get scope of locality
scopeOfLocalityEnv := strings.TrimSpace(os.Getenv("MEEP_SCOPE_OF_LOCALITY"))
if scopeOfLocalityEnv != "" {
scopeOfLocality = scopeOfLocalityEnv
}
log.Info("MEEP_SCOPE_OF_LOCALITY: ", scopeOfLocality)
// Get local consumption
consumedLocalOnlyEnv := strings.TrimSpace(os.Getenv("MEEP_CONSUMED_LOCAL_ONLY"))
if consumedLocalOnlyEnv != "" {
value, err := strconv.ParseBool(consumedLocalOnlyEnv)
if err == nil {
consumedLocalOnly = value
}
}
log.Info("MEEP_CONSUMED_LOCAL_ONLY: ", consumedLocalOnly)
// Get locality
localityEnv := strings.TrimSpace(os.Getenv("MEEP_LOCALITY"))
if localityEnv != "" {
locality = strings.Split(localityEnv, ":")
}
log.Info("MEEP_LOCALITY: ", locality)
if mepName == defaultMepName {
basePath = "/" + sandboxName + "/" + waisBasePath
} else {
basePath = "/" + sandboxName + "/" + mepName + "/" + waisBasePath
}
// Set base storage key
baseKey = dkm.GetKeyRoot(sandboxName) + waisKey + ":mep:" + mepName + ":"
// Connect to Redis DB
rc, err = redis.NewConnector(redisAddr, WAIS_DB)
if err != nil {
log.Error("Failed connection to Redis DB. Error: ", err)
return err
}
_ = rc.DBFlush(baseKey)
reInit()
expiryTicker = time.NewTicker(time.Second)
go func() {
for range expiryTicker.C {
checkForExpiredSubscriptions()
checkAssocStaPeriodTrigger()
checkStaDataRatePeriodTrigger()
}
}()
// Initialize SBI
sbiCfg := sbi.SbiCfg{
SandboxName: sandboxName,
RedisAddr: redisAddr,
StaInfoCb: updateStaInfo,
ApInfoCb: updateApInfo,
ScenarioNameCb: updateStoreName,
CleanUpCb: cleanUp,
}
err = sbi.Init(sbiCfg)
if err != nil {
log.Error("Failed initialize SBI. Error: ", err)
return err
}
log.Info("SBI Initialized")
// Create App Enablement REST clients
if appEnablementEnabled {
// Create App Info client
appInfoClientCfg := appInfoClient.NewConfiguration()
appInfoClientCfg.BasePath = appEnablementUrl + "/app_info/v1"
appEnablementAppInfoClient = appInfoClient.NewAPIClient(appInfoClientCfg)
if appEnablementAppInfoClient == nil {
return errors.New("Failed to create App Info REST API client")
}
// Create App Support client
appSupportClientCfg := appSupportClient.NewConfiguration()
appSupportClientCfg.BasePath = appEnablementUrl + "/mec_app_support/v1"
appEnablementAppSupportClient = appSupportClient.NewAPIClient(appSupportClientCfg)
if appEnablementAppSupportClient == nil {
return errors.New("Failed to create App Enablement App Support REST API client")
}
// Create App Info client
srvMgmtClientCfg := srvMgmtClient.NewConfiguration()
srvMgmtClientCfg.BasePath = appEnablementUrl + "/mec_service_mgmt/v1"
appEnablementSrvMgmtClient = srvMgmtClient.NewAPIClient(srvMgmtClientCfg)
if appEnablementSrvMgmtClient == nil {
return errors.New("Failed to create App Enablement Service Management REST API client")
}
log.Info("WAIS successfully initialized")
return nil
}
// reInit - finds the value already in the DB to repopulate local stored info
func reInit() {
//next available subsId will be overrriden if subscriptions already existed
nextSubscriptionIdAvailable = 1
keyName := baseKey + "subscription:" + "*"
_ = rc.ForEachJSONEntry(keyName, repopulateAssocStaSubscriptionMap, nil)
Simon Pastor
committed
_ = rc.ForEachJSONEntry(keyName, repopulateStaDataRateSubscriptionMap, nil)
}
// Run - Start WAIS
func Run() (err error) {
// Start MEC Service registration ticker
if appEnablementEnabled {
startRegistrationTicker()
}
return sbi.Run()
}
// Stop - Stop WAIS
func Stop() (err error) {
// Stop MEC Service registration ticker
if appEnablementEnabled {
stopRegistrationTicker()
err = sendTerminationConfirmation(serviceAppInstanceId)
func startRegistrationTicker() {
// Wait a few seconds to allow App Enablement Service to start.
// This is done to avoid the default 20 second TCP socket connect timeout
// if the App Enablement Service is not yet running.
log.Info("Waiting for App Enablement Service to start")
time.Sleep(5 * time.Second)
go func() {
mecAppReadySent := false
// registrationSent := false
for range registrationTicker.C {
// Get Application instance ID if not already available
if serviceAppInstanceId == "" {
var err error
serviceAppInstanceId, err = getAppInstanceId(serviceAppName, serviceAppVersion)
if err != nil || serviceAppInstanceId == "" {
continue
}
}
// Send App Ready message
if !mecAppReadySent {
err := sendReadyConfirmation(serviceAppInstanceId)
if err != nil {
log.Error("Failure when sending the MecAppReady message. Error: ", err)
continue
}
mecAppReadySent = true
}
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
// Register service instance
// if !registrationSent {
err := registerService(serviceAppInstanceId, serviceAppName, serviceAppVersion)
if err != nil {
log.Error("Failed to register to appEnablement DB, keep trying. Error: ", err)
continue
}
// registrationSent = true
// }
// // Register for graceful termination
// if !subscriptionSent {
// err = subscribeAppTermination(serviceAppInstanceId)
// if err != nil {
// log.Error("Failed to subscribe to graceful termination. Error: ", err)
// continue
// }
// sendAppTerminationWhenDone = true
// subscriptionSent = true
// }
// Registration complete
log.Info("Successfully registered with App Enablement Service")
stopRegistrationTicker()
return
}
}()
func stopRegistrationTicker() {
if registrationTicker != nil {
log.Info("Stopping App Enablement registration ticker")
registrationTicker.Stop()
registrationTicker = nil
}
func getAppInstanceId(appName string, appVersion string) (id string, err error) {
var appInfo appInfoClient.ApplicationInfo
appInfo.AppName = appName
appInfo.Version = appVersion
state := appInfoClient.INACTIVE_ApplicationState
appInfo.State = &state
appInfoResponse, _, err := appEnablementAppInfoClient.AppsApi.ApplicationsPOST(context.TODO(), appInfo)
log.Error("Failed to get App Instance ID with error: ", err)
return "", err
return appInfoResponse.AppInstanceId, nil
func registerService(appInstanceId string, appName string, appVersion string) error {
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
var srvInfo srvMgmtClient.ServiceInfoPost
//serName
srvInfo.SerName = appName
//version
srvInfo.Version = appVersion
//state
state := srvMgmtClient.ACTIVE_ServiceState
srvInfo.State = &state
//serializer
serializer := srvMgmtClient.JSON_SerializerType
srvInfo.Serializer = &serializer
//transportInfo
var transportInfo srvMgmtClient.TransportInfo
transportInfo.Id = "transport"
transportInfo.Name = "REST"
transportType := srvMgmtClient.REST_HTTP_TransportType
transportInfo.Type_ = &transportType
transportInfo.Protocol = "HTTP"
transportInfo.Version = "2.0"
var endpoint srvMgmtClient.OneOfTransportInfoEndpoint
endpointPath := hostUrl.String() + basePath
endpoint.Uris = append(endpoint.Uris, endpointPath)
transportInfo.Endpoint = &endpoint
srvInfo.TransportInfo = &transportInfo
//serCategory
var category srvMgmtClient.CategoryRef
category.Href = "catalogueHref"
category.Id = "waiId"
category.Name = "WAI"
category.Version = "v2"
srvInfo.SerCategory = &category
//scopeOfLocality
localityType := srvMgmtClient.LocalityType(scopeOfLocality)
srvInfo.ScopeOfLocality = &localityType
srvInfo.ConsumedLocalOnly = consumedLocalOnly
appServicesPostResponse, _, err := appEnablementSrvMgmtClient.AppServicesApi.AppServicesPOST(context.TODO(), srvInfo, appInstanceId)
if err != nil {
log.Error("Failed to register the service to app enablement registry: ", err)
return err
}
log.Info("Application Enablement Service instance Id: ", appServicesPostResponse.SerInstanceId)
return nil
}
func sendReadyConfirmation(appInstanceId string) error {
var appReady appSupportClient.AppReadyConfirmation
indication := appSupportClient.READY_ReadyIndicationType
appReady.Indication = &indication
_, err := appEnablementAppSupportClient.AppConfirmReadyApi.ApplicationsConfirmReadyPOST(context.TODO(), appReady, appInstanceId)
if err != nil {
log.Error("Failed to send a ready confirm acknowlegement: ", err)
func sendTerminationConfirmation(appInstanceId string) error {
var appTermination appSupportClient.AppTerminationConfirmation
operationAction := appSupportClient.TERMINATING_OperationActionType
appTermination.OperationAction = &operationAction
_, err := appEnablementAppSupportClient.AppConfirmTerminationApi.ApplicationsConfirmTerminationPOST(context.TODO(), appTermination, appInstanceId)
log.Error("Failed to send a confirm termination acknowlegement: ", err)
// func subscribeAppTermination(appInstanceId string) error {
// var subscription appSupportClient.AppTerminationNotificationSubscription
// subscription.SubscriptionType = "AppTerminationNotificationSubscription"
// subscription.AppInstanceId = appInstanceId
// subscription.CallbackReference = hostUrl.String() + basePath
// _, _, err := appEnablementAppSupportClient.AppSubscriptionsApi.ApplicationsSubscriptionsPOST(context.TODO(), subscription, appInstanceId)
// if err != nil {
// log.Error("Failed to register to App Support subscription: ", err)
// return err
// }
// return nil
// }
func updateStaInfo(name string, ownMacId string, apMacId string, rssi *int32, sumUl *int32, sumDl *int32) {
// Get STA Info from DB, if any
Simon Pastor
committed
var staData *StaData
jsonStaData, _ := rc.JSONGetEntry(baseKey+"UE:"+name, ".")
if jsonStaData != "" {
staData = convertJsonToStaData(jsonStaData)
}
var dataRate StaDataRate
if sumDl != nil {
dataRate.StaLastDataDownlinkRate = *sumDl //kbps
}
if sumUl != nil {
dataRate.StaLastDataUplinkRate = *sumUl //kbps
}
// Update DB if STA Info does not exist or has changed
Simon Pastor
committed
if staData == nil || isStaInfoUpdateRequired(staData.StaInfo, ownMacId, apMacId, rssi, &dataRate) {
// Set STA Mac ID
Simon Pastor
committed
if staData == nil {
staData = new(StaData)
staData.StaInfo = new(StaInfo)
staData.StaInfo.StaId = new(StaIdentity)
Simon Pastor
committed
staData.StaInfo.StaId.MacId = ownMacId
// Set Associated AP, if any
if apMacId == "" {
Simon Pastor
committed
staData.StaInfo.ApAssociated = nil
Simon Pastor
committed
if staData.StaInfo.ApAssociated == nil {
staData.StaInfo.ApAssociated = new(ApAssociated)
Simon Pastor
committed
staData.StaInfo.ApAssociated.Bssid = apMacId
}
// Set RSSI
if rssi != nil {
var rssiObj Rssi
rssiObj.Rssi = *rssi
Simon Pastor
committed
staData.StaInfo.Rssi = &rssiObj
Simon Pastor
committed
staData.StaInfo.Rssi = nil
//no need to populate, repetitive since populated in the StaInfo
//dataRate.StaId = staData.StaInfo.StaId
Simon Pastor
committed
staData.StaInfo.StaDataRate = &dataRate
_ = rc.JSONSetEntry(baseKey+"UE:"+name, ".", convertStaDataToJson(staData))
checkStaDataRateNotificationRegisteredSubscriptions(staData.StaInfo.StaId, dataRate.StaLastDataDownlinkRate, dataRate.StaLastDataUplinkRate, true)
Simon Pastor
committed
Simon Pastor
committed
func isStaInfoUpdateRequired(staInfo *StaInfo, ownMacId string, apMacId string, rssi *int32, dataRate *StaDataRate) bool {
// Check if STA Info exists
if staInfo == nil {
return true
}
// Compare STA Mac
if ownMacId != staInfo.StaId.MacId {
return true
}
// Compare AP Mac
if (apMacId == "" && staInfo.ApAssociated != nil) ||
Simon Pastor
committed
(apMacId != "" && (staInfo.ApAssociated == nil || apMacId != staInfo.ApAssociated.Bssid)) {
if (rssi == nil && staInfo.Rssi != nil) ||
(rssi != nil && staInfo.Rssi == nil) ||
(rssi != nil && staInfo.Rssi != nil && *rssi != staInfo.Rssi.Rssi) {
return true
}
Simon Pastor
committed
if dataRate.StaLastDataDownlinkRate != staInfo.StaDataRate.StaLastDataDownlinkRate || dataRate.StaLastDataUplinkRate != staInfo.StaDataRate.StaLastDataUplinkRate {
return true
}
str := fmt.Sprintf("%f", *value)
strArray := strings.Split(str, ".")
integerPart, err := strconv.Atoi(strArray[0])
if err != nil {
log.Error("Can't convert float to int")
return 0
}
fractionPart, err := strconv.Atoi(strArray[1])
if err != nil {
log.Error("Can't convert float to int")
return 0
}
//9 first bits are the integer part, last 23 bits are fraction part
valueToReturn := (integerPart << 23) + fractionPart
return int32(valueToReturn)
}
func isUpdateApInfoNeeded(jsonApInfoComplete string, newLong int32, newLat int32, staMacIds []string) bool {
var oldStaMacIds []string
var oldLat int32 = 0
var oldLong int32 = 0
if jsonApInfoComplete == "" {
return true
} else {
apInfoComplete := convertJsonToApInfoComplete(jsonApInfoComplete)
oldStaMacIds = apInfoComplete.StaMacIds
if apInfoComplete.ApLocation.Geolocation != nil {
oldLat = int32(apInfoComplete.ApLocation.Geolocation.Lat)
oldLong = int32(apInfoComplete.ApLocation.Geolocation.Long)
//if AP moved
if oldLat != newLat || oldLong != newLong {
return true
}
//if number of STAs connected changes
if len(oldStaMacIds) != len(staMacIds) {
return true
}
//if the list of connected STAs is different
return !reflect.DeepEqual(oldStaMacIds, staMacIds)
}
func updateApInfo(name string, apMacId string, longitude *float32, latitude *float32, staMacIds []string) {
//get from DB
jsonApInfoComplete, _ := rc.JSONGetEntry(baseKey+"AP:"+name, ".")
newLat := convertFloatToGeolocationFormat(latitude)
newLong := convertFloatToGeolocationFormat(longitude)
if isUpdateApInfoNeeded(jsonApInfoComplete, newLong, newLat, staMacIds) {
//updateDB
var apInfoComplete ApInfoComplete
var apLocation ApLocation
var geoLocation GeoLocation
var apId ApIdentity
Simon Pastor
committed
geoLocation.Lat = int32(newLat)
geoLocation.Long = int32(newLong)
Simon Pastor
committed
apId.Bssid = apMacId
apInfoComplete.ApId = apId
_ = rc.JSONSetEntry(baseKey+"AP:"+name, ".", convertApInfoCompleteToJson(&apInfoComplete))
checkAssocStaNotificationRegisteredSubscriptions(staMacIds, apMacId, true)
}
}
func checkAssocStaPeriodTrigger() {
//loop through every subscriptions, lower period by one and invoke the notification if a check is warranted
mutex.Lock()
defer mutex.Unlock()
if len(assocStaSubscriptionInfoMap) < 1 {
return
}
//decrease all subscriptions
//check all that applies
for _, subInfo := range assocStaSubscriptionInfoMap {
if subInfo != nil {
} else {
//subInfo.NextTts = subInfo.Subscription.NotificationPeriod
subInfo.NotificationCheckReady = false
}
}
}
//find every AP info and reuse a function to store them all
var apInfoCompleteResp ApInfoCompleteResp
apInfoCompleteResp.ApInfoCompleteList = make([]ApInfoComplete, 0)
keyName := baseKey + "AP:*"
err := rc.ForEachJSONEntry(keyName, populateApInfoCompleteList, &apInfoCompleteResp)
if err != nil {
log.Error(err.Error())
return
}
//loop through the response for each AP and check subscription with no need for mutex (already used)
for _, apInfoComplete := range apInfoCompleteResp.ApInfoCompleteList {
checkAssocStaNotificationRegisteredSubscriptions(apInfoComplete.StaMacIds, apInfoComplete.ApId.Bssid, false)
}
}
func checkStaDataRatePeriodTrigger() {
//loop through every subscriptions, lower period by one and invoke the notification if a check is warranted
mutex.Lock()
defer mutex.Unlock()
if len(staDataRateSubscriptionInfoMap) < 1 {
return
}
//decrease all subscriptions
//check all that applies
for _, subInfo := range staDataRateSubscriptionInfoMap {
if subInfo != nil {
} else {
//subInfo.NextTts = subInfo.Subscription.NotificationPeriod
subInfo.NotificationCheckReady = false
}
}
}
//find every AP info and reuse a function to store them all
var staInfoResp StaInfoResp
staInfoResp.StaInfoList = make([]StaInfo, 0)
keyName := baseKey + "UE:*"
err := rc.ForEachJSONEntry(keyName, populateStaData, &staInfoResp)
if err != nil {
log.Error(err.Error())
return
}
//loop through the response for each AP and check subscription with no need for mutex (already used)
for _, staInfo := range staInfoResp.StaInfoList {
dataRate := staInfo.StaDataRate
checkStaDataRateNotificationRegisteredSubscriptions(staInfo.StaId, dataRate.StaLastDataDownlinkRate, dataRate.StaLastDataDownlinkRate, false)
}
}
func checkForExpiredSubscriptions() {
nowTime := int(time.Now().Unix())
for expiryTime, subsIndexList := range subscriptionExpiryMap {
if expiryTime <= nowTime {
subscriptionExpiryMap[expiryTime] = nil
for _, subsId := range subsIndexList {
if assocStaSubscriptionInfoMap[subsId] != nil {
Simon Pastor
committed
var expiryTimeStamp TimeStamp
expiryTimeStamp.Seconds = int32(expiryTime)
link := new(ExpiryNotificationLinks)
linkType := new(LinkType)
linkType.Href = assocStaSubscriptionInfoMap[subsId].Subscription.CallbackReference
Simon Pastor
committed
link.Subscription = linkType
notif.Links = link
notif.ExpiryDeadline = &expiryTimeStamp
sendExpiryNotification(link.Subscription.Href, notif)
_ = delSubscription(baseKey+"subscriptions", subsIdStr, true)
}
if staDataRateSubscriptionInfoMap[subsId] != nil {
Simon Pastor
committed
subsIdStr := strconv.Itoa(subsId)
var notif ExpiryNotification
expiryTimeStamp.Seconds = int32(expiryTime)
Simon Pastor
committed
linkType := new(LinkType)
linkType.Href = staDataRateSubscriptionInfoMap[subsId].Subscription.CallbackReference
Simon Pastor
committed
link.Subscription = linkType
notif.Links = link
notif.ExpiryDeadline = &expiryTimeStamp
Simon Pastor
committed
sendExpiryNotification(link.Subscription.Href, notif)
_ = delSubscription(baseKey+"subscriptions", subsIdStr, true)
Simon Pastor
committed
func repopulateAssocStaSubscriptionMap(key string, jsonInfo string, userData interface{}) error {
// Format response
err := json.Unmarshal([]byte(jsonInfo), &subscription)
if err != nil {
return err
}
selfUrl := strings.Split(subscription.Links.Self.Href, "/")
subsIdStr := selfUrl[len(selfUrl)-1]
subsId, _ := strconv.Atoi(subsIdStr)
assocStaSubscriptionInfoMap[subsId].Subscription = &subscription
assocStaSubscriptionInfoMap[subsId].NotificationCheckReady = false //do not send right away, immediateCheck flag for that
if subscription.ExpiryDeadline != nil {
intList := subscriptionExpiryMap[int(subscription.ExpiryDeadline.Seconds)]
intList = append(intList, subsId)
subscriptionExpiryMap[int(subscription.ExpiryDeadline.Seconds)] = intList
}
//reinitialisation of next available Id for future subscription request
if subsId >= nextSubscriptionIdAvailable {
nextSubscriptionIdAvailable = subsId + 1
}
return nil
}
Simon Pastor
committed
func repopulateStaDataRateSubscriptionMap(key string, jsonInfo string, userData interface{}) error {
var subscription StaDataRateSubscription
// Format response
err := json.Unmarshal([]byte(jsonInfo), &subscription)
if err != nil {
return err
}
selfUrl := strings.Split(subscription.Links.Self.Href, "/")
subsIdStr := selfUrl[len(selfUrl)-1]
subsId, _ := strconv.Atoi(subsIdStr)
mutex.Lock()
defer mutex.Unlock()
staDataRateSubscriptionInfoMap[subsId].Subscription = &subscription
staDataRateSubscriptionInfoMap[subsId].NotificationCheckReady = false //do not send right away, immediateCheck flag for that
Simon Pastor
committed
if subscription.ExpiryDeadline != nil {
intList := subscriptionExpiryMap[int(subscription.ExpiryDeadline.Seconds)]
intList = append(intList, subsId)
subscriptionExpiryMap[int(subscription.ExpiryDeadline.Seconds)] = intList
}
//reinitialisation of next available Id for future subscription request
if subsId >= nextSubscriptionIdAvailable {
nextSubscriptionIdAvailable = subsId + 1
}
return nil
}
func checkAssocStaNotificationRegisteredSubscriptions(staMacIds []string, apMacId string, needMutex bool) {
if needMutex {
mutex.Lock()
defer mutex.Unlock()
}
for subsId, subInfo := range assocStaSubscriptionInfoMap {
if subInfo == nil {
break
}
sub := subInfo.Subscription
Simon Pastor
committed
if sub.ApId.Bssid == apMacId {
Simon Pastor
committed
if match {
if sub.NotificationEvent != nil {
match = false
switch sub.NotificationEvent.Trigger {
Simon Pastor
committed
if len(staMacIds) >= int(sub.NotificationEvent.Threshold) {
match = true
}
Simon Pastor
committed
if len(staMacIds) <= int(sub.NotificationEvent.Threshold) {
match = true
}
default:
}
//if the notification already triggered, do not send it again unless its a periodic event
if match {
if sub.NotificationPeriod == 0 {
if subInfo.Triggered {
sendingNotificationAllowed = false
}
}
} else {
// no match found for threshold, reste trigger
assocStaSubscriptionInfoMap[subsId].Triggered = false
}
Simon Pastor
committed
}
}
if match && sendingNotificationAllowed {
assocStaSubscriptionInfoMap[subsId].Triggered = true
subsIdStr := strconv.Itoa(subsId)
log.Info("Sending WAIS notification ", sub.CallbackReference)
Simon Pastor
committed
apId.Bssid = apMacId
notif.ApId = &apId
for _, staMacId := range staMacIds {
staId.MacId = staMacId
notif.StaId = append(notif.StaId, staId)
}
log.Info("Assoc Sta Notification" + "(" + subsIdStr + ")")
assocStaSubscriptionInfoMap[subsId].NextTts = subInfo.Subscription.NotificationPeriod
assocStaSubscriptionInfoMap[subsId].NotificationCheckReady = false
func checkStaDataRateNotificationRegisteredSubscriptions(staId *StaIdentity, dataRateDl int32, dataRateUl int32, needMutex bool) {
Simon Pastor
committed
if needMutex {
mutex.Lock()
defer mutex.Unlock()
}
Simon Pastor
committed
//check all that applies
for subsId, subInfo := range staDataRateSubscriptionInfoMap {
if subInfo == nil {
break
}
sub := subInfo.Subscription
Simon Pastor
committed
match := false
if sub != nil {
Simon Pastor
committed
notifToSend := false
var staDataRateList []StaDataRate
for _, subStaId := range sub.StaId {
//check to match every values and at least one when its an array
if staId.MacId != subStaId.MacId {
continue
}
if staId.Aid != subStaId.Aid {
continue
}
Simon Pastor
committed
match = true
Simon Pastor
committed
for _, ssid := range subStaId.Ssid {
match = false
//can only have one ssid at a time
if ssid == staId.Ssid[0] {
match = true
break
}