/* * Copyright (c) 2024 The AdvantEDGE Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package iotmgr import ( "errors" "sync" "time" log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger" sssmgr "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-sss-mgr" ) // IOT Manager type IotMgr struct { name string namespace string mutex sync.Mutex wg sync.WaitGroup refreshTicker *time.Ticker } type IotPlatformInfo struct { IotPlatformId string UserTransportInfo []MbTransportInfo CustomServicesTransportInfo []TransportInfo Enabled bool oneM2M *sssmgr.SssMgr } type MbTransportInfo struct { Id string Name string Description string Type_ *string Protocol string Version string Endpoint *EndPointInfo Security *SecurityInfo ImplSpecificInfo *ImplSpecificInfo } type TransportInfo struct { Id string Name string Description string Type_ *string Protocol string Version string Endpoint *EndPointInfo Security *SecurityInfo ImplSpecificInfo string } type EndPointInfo struct { Uris []string Fqdn []string Addresses []Addresses Alternative string } type Addresses struct { Host string Port int32 } type SecurityInfo struct { OAuth2Info *OAuth2Info Extensions string } type OAuth2Info struct { GrantTypes []string TokenEndpoint string } type ImplSpecificInfo struct { EventTopics []string UplinkTopics []string DownlinkTopics []string } type TrafficRuleDescriptor struct { TrafficRuleId string FilterType string Priority int32 TrafficFilter []TrafficFilter Action string DstInterface *InterfaceDescriptor } type InterfaceDescriptor struct { InterfaceType string //TunnelInfo *TunnelInfo FSCOM Not supported SrcMACAddress string DstMACAddress string DstIPAddress string } type TrafficFilter struct { SrcAddress []string DstAddress []string SrcPort []string DstPort []string Protocol []string Tag []string Uri []string PacketLabel []string SrcTunnelAddress []string TgtTunnelAddress []string SrcTunnelPort []string DstTunnelPort []string QCI int32 TC int32 } type KeyValuePair struct { Key string Value string } type DeviceInfo struct { DeviceAuthenticationInfo string DeviceMetadata []KeyValuePair Gpsi string Pei string Supi string Msisdn string Imei string Imsi string Iccid string DeviceId string RequestedMecTrafficRule []TrafficRuleDescriptor RequestedIotPlatformId string RequestedUserTransportId string //DeviceSpecificMessageFormats *DeviceSpecificMessageFormats //DownlinkInfo *DownlinkInfo ClientCertificate string Enabled bool } var registeredIotPlatformsMap = map[string]IotPlatformInfo{} // List of discovered IOT Plateform var devicesMap = map[string]DeviceInfo{} // Map device by deviceId var devicesPerPlatformMap = map[string][]string{} // Map deviceIds per platform var platformPerUserTransportIdMap = map[string][]string{} // Map userTransportId per platform // Timer to refresh devices list for all IoT platform const refreshTickerExpeary = 30 // In seconds // Enable profiling const profiling = false var profilingTimers map[string]time.Time const ( headerAccept = "application/json" headerContentType = "application/json" ) // NewIotMgr - Creates and initializes a new IOT Traffic Manager func NewIotMgr(name string, namespace string) (tm *IotMgr, err error) { if name == "" { err = errors.New("Missing connector name") return nil, err } // Create new Traffic Manager tm = new(IotMgr) tm.name = name if namespace != "" { tm.namespace = namespace } else { tm.namespace = "default" } tm.init() return tm, nil } // Profiling init func (tm *IotMgr) init() { if profiling { profilingTimers = make(map[string]time.Time) } registeredIotPlatformsMap = make(map[string]IotPlatformInfo, 0) devicesMap = make(map[string]DeviceInfo, 0) devicesPerPlatformMap = make(map[string][]string, 0) platformPerUserTransportIdMap = make(map[string][]string, 0) tm.refreshTicker = nil } // DeleteIotMgr - func (tm *IotMgr) DeleteIotMgr() (err error) { return nil } func (tm *IotMgr) startRefreshTicker() { log.Debug(">>> startRefreshTicker") tm.refreshTicker = time.NewTicker(time.Duration(refreshTickerExpeary) * time.Second) go func() { if tm.refreshTicker != nil { for range tm.refreshTicker.C { // Refresh the list of devices tm.wg.Add(1) log.Debug("startRefreshTicker: registeredIotPlatformsMap: ", registeredIotPlatformsMap) for _, v := range registeredIotPlatformsMap { if v.oneM2M != nil { err := tm.populateDevicesPerIotPlatforms(v) if err != nil { log.Error(err) } } else { log.Debug("startRefreshTicker: Nothing to do") } } // End of 'for' statement log.Debug("startRefreshTicker: Before Done()") tm.wg.Done() log.Debug("startRefreshTicker: After Done()") } // End of 'for' statement log.Debug("startRefreshTicker: Leaving time loop") } }() } func (tm *IotMgr) stopRefreshTicker() { if tm.refreshTicker != nil { // Refresh the list of devices tm.wg.Add(1) tm.refreshTicker.Stop() tm.refreshTicker = nil tm.wg.Done() log.Debug("Refresh loop stopped") } } func (tm *IotMgr) RegisterIotPlatformInfo(iotPlatformInfo IotPlatformInfo) (err error) { if profiling { profilingTimers["RegisterIotPlatformInfo"] = time.Now() } log.Info(">>> RegisterIotPlatformInfo: iotPlatformId: ", iotPlatformInfo) if iotPlatformInfo.Enabled { //{{\"iotPlatformId\": \"1a584db5-6a3e-4f56-b126-29180069ecf1\", \"userTransportInfo\": [{\"id\": \"ca22ca5e-e0ce-4da8-a2ce-2966f4759032\", \"name\": \"MQTT\", \"description\": \"MQTT\", \"type\": \"MB_TOPIC_BASED\", \"protocol\": \"MQTT\", \"version\": \"2\", \"endpoint\": {\"addresses\": [{\"host\": \"172.29.10.56\", \"port\": 1883}]}, \"security\": {}, \"implSpecificInfo\": {}}], \"customServicesTransportInfo\": [{\"id\": \"85fe5e7f-c371-4f71-b7f6-61a1f808fbb3\", \"name\": \"/laboai-acme-ic-cse\", \"description\": \"ACME oneM2M CSE\", \"type\": \"REST_HTTP\", \"protocol\": \"REST_HTTP\", \"version\": \"4\", \"endpoint\": {\"addresses\": [{\"host\": \"172.29.10.20\", \"port\": 31110}]}, \"security\": {}}], \"enabled\": true}} iotPlatformInfo.oneM2M = nil if len(iotPlatformInfo.CustomServicesTransportInfo) == 0 || iotPlatformInfo.CustomServicesTransportInfo[0].Endpoint == nil || len(iotPlatformInfo.CustomServicesTransportInfo[0].Endpoint.Addresses) == 0 { log.Warn("RegisterIotPlatformInfo: Cannot use provided CustomServicesTransportInfo") } else { // FIXME FSCOM How to get the CSE_ID // TODO FSCOM Add notification support? pltf, err := sssmgr.NewSssMgr(tm.name, tm.namespace, iotPlatformInfo.CustomServicesTransportInfo[0].Protocol /*"MQTT"*/, iotPlatformInfo.CustomServicesTransportInfo[0].Endpoint.Addresses[0].Host /*"172.29.10.56"*/, int(iotPlatformInfo.CustomServicesTransportInfo[0].Endpoint.Addresses[0].Port) /*1883*/, iotPlatformInfo.IotPlatformId /*"7feaadbb0400"*/, iotPlatformInfo.CustomServicesTransportInfo[0].Name /*"laboai-acme-ic-cse"*/, nil, nil, nil) if err != nil { log.Error("RegisterIotPlatformInfo: ", err) iotPlatformInfo.oneM2M = nil } else { log.Info("RegisterIotPlatformInfo: IoT pltf created") iotPlatformInfo.oneM2M = pltf if tm.refreshTicker == nil { log.Info("RegisterIotPlatformInfo: Start RefreshTicker") tm.startRefreshTicker() } } } registeredIotPlatformsMap[iotPlatformInfo.IotPlatformId] = iotPlatformInfo log.Info("RegisterIotPlatformInfo: iotPlatformId: ", registeredIotPlatformsMap[iotPlatformInfo.IotPlatformId]) } // else, Skip disabled platform if profiling { now := time.Now() log.Debug("RegisterIotPlatformInfo: ", now.Sub(profilingTimers["RegisterIotPlatformInfo"])) } return nil } func (tm *IotMgr) DeregisterIotPlatformInfo(iotPlatformId string) (err error) { if profiling { profilingTimers["DeregisterIotPlatformInfo"] = time.Now() } log.Info(">>> DeregisterIotPlatformInfo: iotPlatformId: ", iotPlatformId) // Remove the list of the devices for this IoT platform if val, ok := devicesPerPlatformMap[iotPlatformId]; ok { // Free resources from devicesMap map for _, dev := range val { delete(devicesMap, dev) } // End of 'for' statement delete(devicesPerPlatformMap, iotPlatformId) log.Info("DeregisterIotPlatformInfo: platformPerUserTransportIdMap (before): ", platformPerUserTransportIdMap) for _, rule := range platformPerUserTransportIdMap { for idx, pltf := range rule { if pltf == iotPlatformId { rule = append(rule[:idx], rule[idx+1:]...) } } // End of 'for' statement } // End of 'for' statement log.Info("DeregisterIotPlatformInfo: platformPerUserTransportIdMap (after): ", platformPerUserTransportIdMap) } if pltf, ok := registeredIotPlatformsMap[iotPlatformId]; ok { if pltf.oneM2M != nil { _ = pltf.oneM2M.DeleteSssMgr() pltf.oneM2M = nil log.Info("RegisterIotPlatformInfo: IoT pltf removed") } delete(registeredIotPlatformsMap, iotPlatformId) if len(registeredIotPlatformsMap) == 0 { if tm.refreshTicker != nil { log.Info("RegisterIotPlatformInfo: Stop RefreshTicker") tm.stopRefreshTicker() tm.refreshTicker = nil } } } if profiling { now := time.Now() log.Debug("DeregisterIotPlatformInfo: ", now.Sub(profilingTimers["DeregisterIotPlatformInfo"])) } return nil } func (tm *IotMgr) GetDevices() (devices []DeviceInfo, err error) { if profiling { profilingTimers["GetDevices"] = time.Now() } log.Info(">>> GetDevices") tm.wg.Wait() log.Info("GetDevices: After Wait()") devices = make([]DeviceInfo, 0) if len(registeredIotPlatformsMap) == 0 { return devices, nil } for _, v := range devicesMap { log.Info("GetDevices: adding device: ", v) devices = append(devices, v) } // End of 'for' statement log.Info("GetDevices: devices: ", devices) if profiling { now := time.Now() log.Debug("GetDevices: ", now.Sub(profilingTimers["GetDevices"])) } return devices, nil } func (tm *IotMgr) GetDevice(deviceId string) (device DeviceInfo, err error) { if profiling { profilingTimers["GetDevice"] = time.Now() } log.Info(">>> GetDevice: deviceId: ", deviceId) tm.wg.Wait() log.Info("GetDevices: After Wait()") if val, ok := devicesMap[deviceId]; !ok { err = errors.New("Wrong Device identifier") return device, err } else { device = val } if profiling { now := time.Now() log.Debug("GetDevice: ", now.Sub(profilingTimers["GetDevice"])) } log.Info("GetDevice: device: ", device) return device, nil } func (tm *IotMgr) CreateDevice(device DeviceInfo) (deviceResp DeviceInfo, err error) { log.Info(">>> CreateDevice: ", device) tm.wg.Wait() log.Info("GetDevices: After Wait()") // RequestedMecTrafficRule is not supported yet if len(device.RequestedMecTrafficRule) != 0 { err = errors.New("Unsupported traffic rule provided") log.Error(err.Error()) return deviceResp, err } if len(device.RequestedIotPlatformId) != 0 { deviceResp, err = tm.createDeviceWithIotPlatformId(device, device.RequestedIotPlatformId) } else { deviceResp, err = tm.createDeviceWithRequestedUserTransportId(device, device.RequestedUserTransportId) } if err != nil { log.Error(err.Error()) return deviceResp, err } log.Info("CreateDevice: deviceResp: ", deviceResp) return deviceResp, nil } func (tm *IotMgr) DeleteDevice(deviceId string) (err error) { if profiling { profilingTimers["DeleteDevice"] = time.Now() } log.Info(">>> DeleteDevice: device: ", deviceId) tm.wg.Wait() log.Info("GetDevices: After Wait()") if _, ok := devicesMap[deviceId]; !ok { err = errors.New("Invalid device identifier") log.Error(err.Error()) return err } device := devicesMap[deviceId] // Remove the list of the devices for this IoT platform if val, ok := devicesPerPlatformMap[device.RequestedIotPlatformId]; ok { // Free resource from devicesMap map log.Info("DeleteDevice: devicesPerPlatformMap (before): ", devicesPerPlatformMap) for idx, devId := range val { if devId == device.DeviceId { val = append(val[:idx], val[idx+1:]...) break } } // End of 'for' statement } log.Info("DeleteDevice: devicesPerPlatformMap (after): ", devicesPerPlatformMap) // Free resource from devicesMap map log.Info("DeleteDevice: devicesMap (before): ", devicesMap) delete(devicesMap, device.DeviceId) log.Info("DeleteDevice: devicesMap (after): ", devicesMap) if profiling { now := time.Now() log.Debug("DeleteDevice: ", now.Sub(profilingTimers["DeleteDevice"])) } return nil } func (tm *IotMgr) createDeviceWithIotPlatformId(device DeviceInfo, requestedIotPlatformId string) (deviceResp DeviceInfo, err error) { log.Info(">>> createDeviceWithIotPlatformId: ", device) // Sanity checks if _, ok := registeredIotPlatformsMap[requestedIotPlatformId]; !ok { err = errors.New("Invalid IotPlatform identifier") return deviceResp, err } if _, ok := devicesMap[device.DeviceId]; ok { err = errors.New("Device already exist") return deviceResp, err } if registeredIotPlatformsMap[requestedIotPlatformId].oneM2M != nil && device.Enabled == true { log.Info("createDeviceWithIotPlatformId: Create device on IoT platform", device) var sensor = sssmgr.SensorDiscoveryInfo{ SensorIdentifier: device.DeviceId, SensorType: "CNT", // FIXME FSCOM How to retrieve this info SensorPosition: nil, IotPlatformId: requestedIotPlatformId, } if len(device.DeviceMetadata) != 0 { sensor.SensorCharacteristicList = make([]sssmgr.SensorCharacteristic, len(device.DeviceMetadata)) for i, c := range device.DeviceMetadata { sensor.SensorCharacteristicList[i] = sssmgr.SensorCharacteristic{CharacteristicName: c.Key, CharacteristicValue: c.Value} } // End of 'for' statement } // FIXME FSCOM How to manage these fields from DeviceInfo // DeviceAuthenticationInfo string // Gpsi string // Pei string // Supi string // Msisdn string // Imei string // Imsi string // Iccid string // RequestedMecTrafficRule []TrafficRuleDescriptor // //DeviceSpecificMessageFormats *DeviceSpecificMessageFormats // //DownlinkInfo *DownlinkInfo // ClientCertificate string // } sensor, err := registeredIotPlatformsMap[requestedIotPlatformId].oneM2M.OneM2M_create(sensor, requestedIotPlatformId, sensor.SensorType) if err != nil { return deviceResp, err } } devicesMap[device.DeviceId] = device devicesPerPlatformMap[device.DeviceId] = append(devicesPerPlatformMap[device.DeviceId], requestedIotPlatformId) platformPerUserTransportIdMap[requestedIotPlatformId] = append(platformPerUserTransportIdMap[requestedIotPlatformId], device.RequestedUserTransportId) deviceResp = device log.Debug("createDeviceWithIotPlatformId: deviceResp: ", deviceResp) return deviceResp, nil } func (tm *IotMgr) createDeviceWithRequestedUserTransportId(device DeviceInfo, requestedUserTransportId string) (deviceResp DeviceInfo, err error) { log.Info(">>> createDeviceWithRequestedUserTransportId: ", device) if val, ok := platformPerUserTransportIdMap[requestedUserTransportId]; ok { deviceResp, err = tm.createDeviceWithIotPlatformId(device, val[0]) } else { err = errors.New("Invalid UserTransportId") } if err != nil { log.Error("createDeviceWithIotPlatformId: ", err.Error()) return deviceResp, err } log.Info("createDeviceWithIotPlatformId: deviceResp: ", deviceResp) return deviceResp, nil } func (tm *IotMgr) resetMaps(iotPlatformId string) { log.Info(">>> resetMaps: ", iotPlatformId) // Free resources from devicesMap map // Remove all devices for this IoT platform log.Info("resetMaps: devicesMap (before): ", devicesMap) for _, deviceId := range devicesPerPlatformMap[iotPlatformId] { delete(devicesMap, deviceId) } // End of 'for' statement log.Info("resetMaps: devicesMap (after): ", devicesMap) // Remove all devices for this IoT platform log.Info("resetMaps: devicesPerPlatformMap (before): ", devicesPerPlatformMap) delete(devicesPerPlatformMap, iotPlatformId) log.Info("resetMaps: devicesPerPlatformMap (after): ", devicesPerPlatformMap) log.Info("resetMaps: platformPerUserTransportIdMap (before): ", platformPerUserTransportIdMap) for _, rule := range platformPerUserTransportIdMap { for idx, pltf := range rule { if pltf == iotPlatformId { rule = append(rule[:idx], rule[idx+1:]...) } } // End of 'for' statement } // End of 'for' statement log.Info("resetMaps: platformPerUserTransportIdMap (after): ", platformPerUserTransportIdMap) } func (tm *IotMgr) populateDevicesPerIotPlatforms(iotPlatformInfo IotPlatformInfo) (err error) { log.Info(">>> populateDevicesPerIotPlatforms: ", iotPlatformInfo) if iotPlatformInfo.oneM2M == nil { log.Info("populateDevicesPerIotPlatforms: Nothing to do") return nil } // Reset maps tm.resetMaps(iotPlatformInfo.IotPlatformId) sensors, err := iotPlatformInfo.oneM2M.SensorDiscoveryInfoAll() if err != nil { log.Error("populateDevicesPerIotPlatforms: ", err) return err } log.Info("populateDevicesPerIotPlatforms: sensors: ", sensors) for _, sensor := range sensors { var deviceInfo = DeviceInfo{ DeviceId: sensor.SensorIdentifier, Enabled: true, } deviceInfo.DeviceMetadata = make([]KeyValuePair, len(sensor.SensorCharacteristicList)) for i, c := range sensor.SensorCharacteristicList { deviceInfo.DeviceMetadata[i] = KeyValuePair{Key: c.CharacteristicName, Value: c.CharacteristicValue} } // End of 'for' statement devicesMap[deviceInfo.DeviceId] = deviceInfo devicesPerPlatformMap[iotPlatformInfo.IotPlatformId] = append(devicesPerPlatformMap[iotPlatformInfo.IotPlatformId], deviceInfo.DeviceId) } // End of 'for' statement log.Info("populateDevicesPerIotPlatforms: devicesMap: ", devicesMap) log.Info("populateDevicesPerIotPlatforms: devicesPerPlatformMap: ", devicesPerPlatformMap) return nil }