Newer
Older
/*
* Copyright (c) 2024 The AdvantEDGE Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"net/http"
"reflect"
"strconv"
"strings"
"sync"
"time"
log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
uuid "github.com/google/uuid"
)
// Sensors-Sharing Service Manager
type SssMgr struct {
name string
namespace string
mutex sync.Mutex
wg sync.WaitGroup
refreshTicker *time.Ticker
sss_discovery_notify func()
sss_status_notify func()
sss_data_notify func()
Address string
Port int
Name string
IotPlatformId string
type Point struct {
Latitude float64
Longitude float64
}
type SensorCharacteristic struct {
CharacteristicName string
CharacteristicValue string
CharacteristicUnitOfMeasure *string
}
type SensorDiscoveryInfo struct {
SensorIdentifier string
SensorType string
SensorPropertyList []string
SensorCharacteristicList []SensorCharacteristic
SensorPosition *Point
IotPlatformId string
}
var registeredIotPlatformsMap = map[string]IotPlatformInfo{} // List of discovered IOT Plateform
var sensorsMap = map[string]SensorDiscoveryInfo{} // Map sensors by sensorIdentifier
var sensorsPerPlatformMap = map[string][]string{} // Map dsensorIdentifiers per platform
// Timer to refresh devices list for all IoT platform
const refreshTickerExpeary = 10 // In seconds
// Enable profiling
const profiling = false
var profilingTimers map[string]time.Time
const (
iot_platform_address = "lab-oai.etsi.org"
iot_platform_port = 31110
iot_platform_name = "laboai-acme-ic-cse"
iot_platform_id = "7feaadbb0400"
// NewSssMgr - Creates and initializes a new SSS Traffic Manager
func NewSssMgr(name string, namespace string, bindingProtocol string, host string, port int, sss_discovery_notify func(), sss_status_notify func(), sss_data_notify func()) (tm *SssMgr, err error) {
if name == "" {
err = errors.New("Missing connector name")
return nil, err
}
// Create new Traffic Manager
tm.name = name
if namespace != "" {
tm.namespace = namespace
} else {
tm.namespace = "default"
}
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
tm.bindingProtocol = bindingProtocol
tm.host = host
tm.port = port
if tm.bindingProtocol == "MQTT" {
if tm.host == "" {
err := errors.New("Host not set for MQTTP protocol")
log.Error(err.Error())
return nil, err
}
if tm.port == 0 {
tm.port = 1883
}
protocol = NewSssMgrMqtt()
} else if tm.bindingProtocol == "HTTP" {
protocol = NewSssMgrHttp()
} else {
err := errors.New("Binding protocol not set")
log.Error(err.Error())
return nil, err
}
err = protocol.init(tm)
if err != nil {
log.Error(err.Error())
return nil, err
}
tm.sss_discovery_notify = sss_discovery_notify
tm.sss_status_notify = sss_status_notify
tm.sss_data_notify = sss_data_notify
tm.init()
return tm, nil
}
// Profiling init
if profiling {
profilingTimers = make(map[string]time.Time)
}
registeredIotPlatformsMap = make(map[string]IotPlatformInfo, 1)
registeredIotPlatformsMap[iot_platform_address] = IotPlatformInfo{ // FIXME FSCOM How to register IoT platform to meep-sss (see meep-iot?)
Address: iot_platform_address,
Port: iot_platform_port,
Name: iot_platform_name,
IotPlatformId: iot_platform_id,
}
sensorsMap = make(map[string]SensorDiscoveryInfo, 0)
sensorsPerPlatformMap = make(map[string][]string, 0)
// DeleteSssMgr -
func (tm *SssMgr) DeleteSssMgr() (err error) {
if protocol != nil {
protocol.uninit()
protocol = nil
}
func (tm *SssMgr) startRefreshTicker() {
log.Debug("Starting refresh loop")
tm.refreshTicker = time.NewTicker(refreshTickerExpeary * time.Second)
go func() {
if tm.refreshTicker != nil {
for range tm.refreshTicker.C {
// Refresh the list of devices
tm.wg.Add(1)
err := tm.populateDevicesPerIotPlatforms()
if err != nil {
log.Error(err)
}
tm.wg.Done()
}
}
}()
}
func (tm *SssMgr) stopRefreshTicker() {
if tm.refreshTicker != nil {
// Refresh the list of devices
Loading
Loading full blame…