/* * Copyright (c) 2024 The AdvantEDGE Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package iotmgr import ( "encoding/json" "errors" "fmt" "io" "io/ioutil" "net/http" "strconv" "sync" "time" log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger" uuid "github.com/google/uuid" "github.com/gorilla/mux" ) // IOT Manager type IotMgr struct { name string namespace string mutex sync.Mutex } type IotPlatformInfo struct { IotPlatformId string UserTransportInfo []MbTransportInfo CustomServicesTransportInfo []TransportInfo Enabled bool } type MbTransportInfo struct { Id string Name string Description string Type_ *string Protocol string Version string Endpoint *EndPointInfo Security *SecurityInfo ImplSpecificInfo *ImplSpecificInfo } type TransportInfo struct { Id string Name string Description string Type_ *string Protocol string Version string Endpoint *EndPointInfo Security *SecurityInfo ImplSpecificInfo string } type EndPointInfo struct { Uris []string Fqdn []string Addresses []Addresses Alternative string } type Addresses struct { Host string Port int32 } type SecurityInfo struct { OAuth2Info *OAuth2Info Extensions string } type OAuth2Info struct { GrantTypes []string TokenEndpoint string } type ImplSpecificInfo struct { EventTopics []string UplinkTopics []string DownlinkTopics []string } type DeviceInfo struct { DeviceAuthenticationInfo string //DeviceMetadata []KeyValuePair Gpsi string Pei string Supi string Msisdn string Imei string Imsi string Iccid string DeviceId string //RequestedMecTrafficRule []TrafficRuleDescriptor RequestedIotPlatformId string RequestedUserTransportId string //DeviceSpecificMessageFormats *DeviceSpecificMessageFormats //DownlinkInfo *DownlinkInfo ClientCertificate string Enabled bool } var registeredIotPlatformsMap = map[string]IotPlatformInfo{} // List of discovered IOT Plateform var devicesMap = map[string]DeviceInfo{} // Map device by deviceId var devicesPerPlatformMap = map[string][]string{} // Map deviceIds per platform // Enable profiling const profiling = false var profilingTimers map[string]time.Time const ( headerAccept = "application/json" headerContentType = "application/json" ) // Profiling init func init() { if profiling { profilingTimers = make(map[string]time.Time) } } // NewIotMgr - Creates and initializes a new IOT Traffic Manager func NewIotMgr(name string, namespace string) (tm *IotMgr, err error) { if name == "" { err = errors.New("Missing connector name") return nil, err } // Create new Traffic Manager tm = new(IotMgr) tm.name = name if namespace != "" { tm.namespace = namespace } else { tm.namespace = "default" } return tm, nil } // DeleteIotMgr - func (tm *IotMgr) DeleteIotMgr() (err error) { return nil } func (tm *IotMgr) RegisterIotPlatformInfo(iotPlatformInfo IotPlatformInfo) (err error) { if profiling { profilingTimers["RegisterIotPlatformInfo"] = time.Now() } log.Info(">>> RegisterIotPlatformInfo: iotPlatformId: ", iotPlatformInfo) if iotPlatformInfo.Enabled { registeredIotPlatformsMap[iotPlatformInfo.IotPlatformId] = iotPlatformInfo } // else, Skip disabled platform if profiling { now := time.Now() log.Debug("RegisterIotPlatformInfo: ", now.Sub(profilingTimers["RegisterIotPlatformInfo"])) } return nil } func (tm *IotMgr) DeregisterIotPlatformInfo(iotPlatformId string) (err error) { if profiling { profilingTimers["DeregisterIotPlatformInfo"] = time.Now() } log.Info(">>> DeregisterIotPlatformInfo: iotPlatformId: ", iotPlatformId) // Remove the list of the devices for this IoT platform if val, ok := devicesPerPlatformMap[iotPlatformId]; ok { // Free resources from devicesMap map for _, dev := range val { delete(devicesMap, dev) } // End of 'for' statement delete(devicesPerPlatformMap, iotPlatformId) } if _, ok := registeredIotPlatformsMap[iotPlatformId]; ok { delete(registeredIotPlatformsMap, iotPlatformId) } if profiling { now := time.Now() log.Debug("DeregisterIotPlatformInfo: ", now.Sub(profilingTimers["DeregisterIotPlatformInfo"])) } return nil } func (tm *IotMgr) GetDevices() (devices []DeviceInfo, err error) { if profiling { profilingTimers["GetDevices"] = time.Now() } log.Info(">>> GetDevices: iotPlatformId") if len(registeredIotPlatformsMap) == 0 { return devices, nil } // Refresh the list of devices for _, val := range registeredIotPlatformsMap { err := populateDevices(val.IotPlatformId) if err != nil { continue } } log.Info("GetDevices: devices: ", devices) if profiling { now := time.Now() log.Debug("GetDevices: ", now.Sub(profilingTimers["GetDevices"])) } return devices, nil } func (tm *IotMgr) GetDevice(deviceId string) (device DeviceInfo, err error) { if profiling { profilingTimers["GetDevice"] = time.Now() } log.Info(">>> GetDevice: deviceId: ", deviceId) if val, ok := devicesMap[deviceId]; !ok { err = errors.New("Wrong Device identifier") return device, err } else { log.Info(" GetDevice: device: ", val) device = val } if profiling { now := time.Now() log.Debug("GetDevice: ", now.Sub(profilingTimers["GetDevice"])) } return device, nil } /* * func (tm *IotMgr) PopulateDevices IoT devices for the specified Iot platform * @param {string} iotPlatformId contains the IoT platform identifier * @return {struct} nil on success, error otherwise */ func populateDevices(iotPlatformId string) error { if profiling { profilingTimers["populateDevices"] = time.Now() } log.Info(">>> populateDevices: iotPlatformId=", iotPlatformId) // Build the URL t := registeredIotPlatformsMap[iotPlatformId].CustomServicesTransportInfo[0] log.Info("populateDevices: t.Endpoint.Addresses[0]=", t.Endpoint.Addresses[0]) url := "http://" + t.Endpoint.Addresses[0].Host + ":" + strconv.Itoa(int(t.Endpoint.Addresses[0].Port)) + "/" + t.Name // Builkd the headers var headers = http.Header{} headers["Accept"] = []string{headerAccept} headers["ContentType"] = []string{headerContentType} headers["X-M2M-Origin"] = []string{"CAdmin"} headers["X-M2M-RI"] = []string{uuid.New().String()} headers["X-M2M-RVI"] = []string{t.Version} // Send the request response, err := sendRequest("GET", url, headers, nil, nil, nil, 200) if err != nil { log.Error("populateDevices: ", err.Error()) return err } log.Debug("populateDevices: response: ", string(response)) var oneM2M_data map[string]interface{} err = json.Unmarshal(response, &oneM2M_data) if err != nil { log.Error("populateDevices: ", err.Error()) return err } log.Debug("populateDevices: oneM2M_data: ", len(oneM2M_data)) log.Debug(oneM2M_data) for k := range oneM2M_data { log.Info("populateDevices: Processing key: ", k) var device = DeviceInfo{ DeviceId: "", RequestedIotPlatformId: "", } // type DeviceInfo struct { // DeviceAuthenticationInfo string // //DeviceMetadata []KeyValuePair // Gpsi string // Pei string // Supi string // Msisdn string // Imei string // Imsi string // Iccid string // DeviceId string // //RequestedMecTrafficRule []TrafficRuleDescriptor // RequestedIotPlatformId string // RequestedUserTransportId string // //DeviceSpecificMessageFormats *DeviceSpecificMessageFormats // //DownlinkInfo *DownlinkInfo // ClientCertificate string // Enabled bool // } devicesMap[device.DeviceId] = device devicesPerPlatformMap[device.RequestedIotPlatformId] = append(devicesPerPlatformMap[device.RequestedIotPlatformId], device.DeviceId) } // End of 'for' statement // for _, val := range devices { // } // log.Info("populateDevices: devices: ", devices) if profiling { now := time.Now() log.Debug("populateDevices: ", now.Sub(profilingTimers["populateDevices"])) } return nil } func sendRequest(method string, url string, headers http.Header, body io.Reader, vars map[string]string, query map[string]string, code int) ([]byte, error) { log.Debug(">>> sendRequest: url: ", url) log.Debug(">>> sendRequest: headers: ", headers) req, err := http.NewRequest(method, url, body) if err != nil || req == nil { return nil, err } if vars != nil { req = mux.SetURLVars(req, vars) } if query != nil { q := req.URL.Query() for k, v := range query { q.Add(k, v) } req.URL.RawQuery = q.Encode() } req.Header = headers req.Close = true rr, err := http.DefaultClient.Do(req) if err != nil { return nil, err } // Check the status code is what we expect. if status := rr.StatusCode; status != code { s := fmt.Sprintf("Wrong status code - got %v want %v", status, code) return nil, errors.New(s) } responseData, err := ioutil.ReadAll(rr.Body) if err != nil { return nil, err } return responseData, nil }