/* * Copyright (c) 2024 The AdvantEDGE Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package iotmgr import ( "encoding/json" "errors" "fmt" "io" "io/ioutil" "net/http" "reflect" "strconv" "strings" "sync" "time" log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger" uuid "github.com/google/uuid" "github.com/gorilla/mux" ) // IOT Manager type IotMgr struct { name string namespace string mutex sync.Mutex } type IotPlatformInfo struct { IotPlatformId string UserTransportInfo []MbTransportInfo CustomServicesTransportInfo []TransportInfo Enabled bool } type MbTransportInfo struct { Id string Name string Description string Type_ *string Protocol string Version string Endpoint *EndPointInfo Security *SecurityInfo ImplSpecificInfo *ImplSpecificInfo } type TransportInfo struct { Id string Name string Description string Type_ *string Protocol string Version string Endpoint *EndPointInfo Security *SecurityInfo ImplSpecificInfo string } type EndPointInfo struct { Uris []string Fqdn []string Addresses []Addresses Alternative string } type Addresses struct { Host string Port int32 } type SecurityInfo struct { OAuth2Info *OAuth2Info Extensions string } type OAuth2Info struct { GrantTypes []string TokenEndpoint string } type ImplSpecificInfo struct { EventTopics []string UplinkTopics []string DownlinkTopics []string } // ETSI GS MEC 046 V3.1.1 (2024-04) Clause 6.2.1 Type: SensorDiscoveryInfo type SensorCharacteristic struct { CharacteristicName string CharacteristicValue string CharacteristicUnitOfMeasure *string } type DeviceInfo struct { DeviceAuthenticationInfo string //DeviceMetadata []KeyValuePair Gpsi string Pei string Supi string Msisdn string Imei string Imsi string Iccid string DeviceId string //RequestedMecTrafficRule []TrafficRuleDescriptor RequestedIotPlatformId string RequestedUserTransportId string //DeviceSpecificMessageFormats *DeviceSpecificMessageFormats //DownlinkInfo *DownlinkInfo ClientCertificate string Enabled bool SensorIdentifier string SensorStatusType string SensorPropertyList []string SensorCharacteristicList []SensorCharacteristic } var registeredIotPlatformsMap = map[string]IotPlatformInfo{} // List of discovered IOT Plateform var devicesMap = map[string]DeviceInfo{} // Map device by deviceId var devicesPerPlatformMap = map[string][]string{} // Map deviceIds per platform // Enable profiling const profiling = false var profilingTimers map[string]time.Time const ( headerAccept = "application/json" headerContentType = "application/json" ) // Profiling init func init() { if profiling { profilingTimers = make(map[string]time.Time) } } // NewIotMgr - Creates and initializes a new IOT Traffic Manager func NewIotMgr(name string, namespace string) (tm *IotMgr, err error) { if name == "" { err = errors.New("Missing connector name") return nil, err } // Create new Traffic Manager tm = new(IotMgr) tm.name = name if namespace != "" { tm.namespace = namespace } else { tm.namespace = "default" } return tm, nil } // DeleteIotMgr - func (tm *IotMgr) DeleteIotMgr() (err error) { return nil } func (tm *IotMgr) RegisterIotPlatformInfo(iotPlatformInfo IotPlatformInfo) (err error) { if profiling { profilingTimers["RegisterIotPlatformInfo"] = time.Now() } log.Info(">>> RegisterIotPlatformInfo: iotPlatformId: ", iotPlatformInfo) if iotPlatformInfo.Enabled { registeredIotPlatformsMap[iotPlatformInfo.IotPlatformId] = iotPlatformInfo } // else, Skip disabled platform if profiling { now := time.Now() log.Debug("RegisterIotPlatformInfo: ", now.Sub(profilingTimers["RegisterIotPlatformInfo"])) } return nil } func (tm *IotMgr) DeregisterIotPlatformInfo(iotPlatformId string) (err error) { if profiling { profilingTimers["DeregisterIotPlatformInfo"] = time.Now() } log.Info(">>> DeregisterIotPlatformInfo: iotPlatformId: ", iotPlatformId) // Remove the list of the devices for this IoT platform if val, ok := devicesPerPlatformMap[iotPlatformId]; ok { // Free resources from devicesMap map for _, dev := range val { delete(devicesMap, dev) } // End of 'for' statement delete(devicesPerPlatformMap, iotPlatformId) } if _, ok := registeredIotPlatformsMap[iotPlatformId]; ok { delete(registeredIotPlatformsMap, iotPlatformId) } if profiling { now := time.Now() log.Debug("DeregisterIotPlatformInfo: ", now.Sub(profilingTimers["DeregisterIotPlatformInfo"])) } return nil } func (tm *IotMgr) GetDevices() (devices []DeviceInfo, err error) { if profiling { profilingTimers["GetDevices"] = time.Now() } log.Info(">>> GetDevices") devices = make([]DeviceInfo, 0) if len(registeredIotPlatformsMap) == 0 { return devices, nil } // Refresh the list of devices for _, iotPlatform := range registeredIotPlatformsMap { log.Info("GetDevices: processing: ", iotPlatform.IotPlatformId) err := populateDevices(iotPlatform) if err != nil { log.Error("GetDevices: ", err) continue } for _, v := range devicesPerPlatformMap[iotPlatform.IotPlatformId] { log.Info("GetDevices: adding device: ", v) devices = append(devices, devicesMap[v]) } } // End of 'for' statement log.Info("GetDevices: devices: ", devices) if profiling { now := time.Now() log.Debug("GetDevices: ", now.Sub(profilingTimers["GetDevices"])) } return devices, nil } func (tm *IotMgr) GetDevice(deviceId string) (device DeviceInfo, err error) { if profiling { profilingTimers["GetDevice"] = time.Now() } log.Info(">>> GetDevice: deviceId: ", deviceId) if val, ok := devicesMap[deviceId]; !ok { err = errors.New("Wrong Device identifier") return device, err } else { log.Info(" GetDevice: device: ", val) device = val } if profiling { now := time.Now() log.Debug("GetDevice: ", now.Sub(profilingTimers["GetDevice"])) } return device, nil } /* * func PopulateDevices IoT devices for the specified Iot platform * @param {string} iotPlatformId contains the IoT platform identifier * @return {struct} nil on success, error otherwise */ func populateDevices(iotPlatformInfo IotPlatformInfo) error { if profiling { profilingTimers["populateDevices"] = time.Now() } log.Info(">>> populateDevices: iotPlatformId=", iotPlatformInfo.IotPlatformId) // 1. Get the list of the AE // Build the URL t := registeredIotPlatformsMap[iotPlatformInfo.IotPlatformId].CustomServicesTransportInfo[0] log.Info("populateDevices: t.Endpoint.Addresses[0]=", t.Endpoint.Addresses[0]) url := "http://" + t.Endpoint.Addresses[0].Host + ":" + strconv.Itoa(int(t.Endpoint.Addresses[0].Port)) + "/" + t.Name log.Info("populateDevices: url=", url) // Build the headers var headers = http.Header{} headers["Accept"] = []string{headerAccept} headers["ContentType"] = []string{headerContentType} headers["X-M2M-Origin"] = []string{"CAdmin"} headers["X-M2M-RI"] = []string{uuid.New().String()} headers["X-M2M-RVI"] = []string{t.Version} // Build the queries queries := map[string]string{} queries["fu"] = "1" // Filter usage queries["ty"] = "4" // Filter on oneM2M CIN for device // Send the request response, err := sendRequest("GET", url, headers, nil, nil, queries, 200) if err != nil { log.Error("populateDevices: ", err.Error()) return err } log.Debug("populateDevices: response: ", string(response)) var oneM2M_uril map[string][]string err = json.Unmarshal(response, &oneM2M_uril) if err != nil { log.Error("populateDevices: ", err.Error()) return err } log.Debug("populateDevices: oneM2M_uril: ", len(oneM2M_uril)) log.Debug(oneM2M_uril) if _, ok := oneM2M_uril["m2m:uril"]; !ok { err := errors.New("populateDevices: Key not found: m2m:uril") log.Error(err.Error()) return err } // Loop for each CIN and build the device list for _, v := range oneM2M_uril["m2m:uril"] { log.Info("populateDevices: Processing key: ", v) url := "http://" + t.Endpoint.Addresses[0].Host + ":" + strconv.Itoa(int(t.Endpoint.Addresses[0].Port)) + "/" + v log.Info("populateDevices: url=", url) // Build the headers var headers = http.Header{} headers["Accept"] = []string{headerAccept} headers["ContentType"] = []string{headerContentType} headers["X-M2M-Origin"] = []string{"CAdmin"} headers["X-M2M-RI"] = []string{uuid.New().String()} headers["X-M2M-RVI"] = []string{t.Version} // Build the queries queries := map[string]string{} queries["fu"] = "2" // Filter usage // Send the request response, err := sendRequest("GET", url, headers, nil, nil, queries, 200) if err != nil { log.Error("populateDevices: ", err.Error()) return err } log.Debug("populateDevices: response: ", string(response)) var oneM2M_cin map[string]map[string]interface{} err = json.Unmarshal(response, &oneM2M_cin) if err != nil { log.Error("populateDevices: ", err.Error()) continue } //log.Debug("populateDevices: type(oneM2M_cin): ", reflect.TypeOf(oneM2M_cin)) //log.Debug("populateDevices: len(oneM2M_cin): ", len(oneM2M_cin)) //log.Debug("populateDevices: oneM2M_cin: ", oneM2M_cin) for _, m := range oneM2M_cin { //log.Debug("==> ", i, " value is ", m) var device = DeviceInfo{} device.RequestedIotPlatformId = iotPlatformInfo.IotPlatformId device.RequestedUserTransportId = registeredIotPlatformsMap[iotPlatformInfo.IotPlatformId].UserTransportInfo[0].Id device.Enabled = true // m is a map[string]interface. // loop over keys and values in the map. for k, v := range m { log.Debug(k, " value is ", v) log.Debug("populateDevices: type(v): ", reflect.TypeOf(v)) if k == "rn" { if item, ok := v.(string); ok { device.DeviceId = item } else { log.Error("populateDevices: Failed to process ", k) } } else if k == "ri" { if item, ok := v.(string); ok { device.SensorIdentifier = item } else { log.Error("populateDevices: Failed to process ", k) } } else if k == "ty" { if item, ok := v.(float64); ok { device.SensorStatusType = strconv.FormatFloat(item, 'f', -1, 64) } else { log.Error("populateDevices: Failed to process ", k) } } else { // default: if k == "lt" || k == "et" || k == "ct" || k == "st" || k == "pi" || k == "lbl" { if item, ok := v.(string); ok { device.SensorCharacteristicList = append( device.SensorCharacteristicList, SensorCharacteristic{ CharacteristicName: k, CharacteristicValue: string(item), CharacteristicUnitOfMeasure: nil, }) } else if item, ok := v.(float64); ok { device.SensorCharacteristicList = append( device.SensorCharacteristicList, SensorCharacteristic{ CharacteristicName: k, CharacteristicValue: strconv.FormatFloat(item, 'f', -1, 64), CharacteristicUnitOfMeasure: nil, }) } else if item, ok := v.([]string); ok { device.SensorCharacteristicList = append( device.SensorCharacteristicList, SensorCharacteristic{ CharacteristicName: k, CharacteristicValue: strings.Join(item, ","), CharacteristicUnitOfMeasure: nil, }) } else { log.Error("populateDevices: Failed to process ", k) } } } // End of 'for' loop log.Info("populateDevices: device: ", device) devicesMap[device.DeviceId] = device devicesPerPlatformMap[device.RequestedIotPlatformId] = append(devicesPerPlatformMap[device.RequestedIotPlatformId], device.DeviceId) } // End of 'for' loop } // End of 'for' statement log.Info("populateDevices: devicesMap: ", devicesMap) log.Info("populateDevices: devicesPerPlatformMap: ", devicesPerPlatformMap) if profiling { now := time.Now() log.Debug("populateDevices: ", now.Sub(profilingTimers["populateDevices"])) } return nil } func sendRequest(method string, url string, headers http.Header, body io.Reader, vars map[string]string, query map[string]string, code int) ([]byte, error) { log.Debug(">>> sendRequest: url: ", url) log.Debug(">>> sendRequest: headers: ", headers) req, err := http.NewRequest(method, url, body) if err != nil || req == nil { return nil, err } if vars != nil { req = mux.SetURLVars(req, vars) } if query != nil { q := req.URL.Query() for k, v := range query { q.Add(k, v) } req.URL.RawQuery = q.Encode() } req.Header = headers req.Close = true rr, err := http.DefaultClient.Do(req) if err != nil { return nil, err } // Check the status code is what we expect. if status := rr.StatusCode; status != code { s := fmt.Sprintf("Wrong status code - got %v want %v", status, code) return nil, errors.New(s) } responseData, err := ioutil.ReadAll(rr.Body) if err != nil { return nil, err } return responseData, nil }