/* * Copyright (c) 2024 The AdvantEDGE Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package onem2mmgr import ( "bytes" "encoding/json" "errors" "fmt" "io" "io/ioutil" "net/http" "reflect" "strconv" "strings" "sync" "time" log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger" uuid "github.com/google/uuid" "github.com/gorilla/mux" ) // IOT Manager type IotMgr struct { name string namespace string mutex sync.Mutex wg sync.WaitGroup refreshTicker *time.Ticker } type IotPlatformInfo struct { IotPlatformId string UserTransportInfo []MbTransportInfo CustomServicesTransportInfo []TransportInfo Enabled bool DeviceInfo *DeviceInfo } type MbTransportInfo struct { Id string Name string Description string Type_ *string Protocol string Version string Endpoint *EndPointInfo Security *SecurityInfo ImplSpecificInfo *ImplSpecificInfo } type TransportInfo struct { Id string Name string Description string Type_ *string Protocol string Version string Endpoint *EndPointInfo Security *SecurityInfo ImplSpecificInfo string } type EndPointInfo struct { Uris []string Fqdn []string Addresses []Addresses Alternative string } type Addresses struct { Host string Port int32 } type SecurityInfo struct { OAuth2Info *OAuth2Info Extensions string } type OAuth2Info struct { GrantTypes []string TokenEndpoint string } type ImplSpecificInfo struct { EventTopics []string UplinkTopics []string DownlinkTopics []string } // ETSI GS MEC 046 V3.1.1 (2024-04) Clause 6.2.1 Type: SensorDiscoveryInfo type SensorCharacteristic struct { CharacteristicName string CharacteristicValue string CharacteristicUnitOfMeasure *string } type TrafficRuleDescriptor struct { TrafficRuleId string FilterType string Priority int32 TrafficFilter []TrafficFilter Action string DstInterface *InterfaceDescriptor } type InterfaceDescriptor struct { InterfaceType string //TunnelInfo *TunnelInfo FSCOM Not supported SrcMACAddress string DstMACAddress string DstIPAddress string } type TrafficFilter struct { SrcAddress []string DstAddress []string SrcPort []string DstPort []string Protocol []string Tag []string Uri []string PacketLabel []string SrcTunnelAddress []string TgtTunnelAddress []string SrcTunnelPort []string DstTunnelPort []string QCI int32 TC int32 } type KeyValuePair struct { Key string Value string } type DeviceInfo struct { DeviceAuthenticationInfo string DeviceMetadata []KeyValuePair Gpsi string Pei string Supi string Msisdn string Imei string Imsi string Iccid string DeviceId string RequestedMecTrafficRule []TrafficRuleDescriptor RequestedIotPlatformId string RequestedUserTransportId string //DeviceSpecificMessageFormats *DeviceSpecificMessageFormats //DownlinkInfo *DownlinkInfo ClientCertificate string Enabled bool // SensorIdentifier string // SensorStatusType string // SensorPropertyList []string // SensorCharacteristicList []SensorCharacteristic } var registeredIotPlatformsMap = map[string]IotPlatformInfo{} // List of discovered IOT Plateform var devicesMap = map[string]DeviceInfo{} // Map device by deviceId var devicesPerPlatformMap = map[string][]string{} // Map deviceIds per platform var platformPerUserTransportIdMap = map[string][]string{} // Map userTransportId per platform // Timer to refresh devices list for all IoT platform const refreshTickerExpeary = 10 // In seconds // Enable profiling const profiling = false var profilingTimers map[string]time.Time const ( headerAccept = "application/json" headerContentType = "application/json" ) // NewIotMgr - Creates and initializes a new IOT Traffic Manager func NewIotMgr(name string, namespace string) (tm *IotMgr, err error) { if name == "" { err = errors.New("Missing connector name") return nil, err } // Create new Traffic Manager tm = new(IotMgr) tm.name = name if namespace != "" { tm.namespace = namespace } else { tm.namespace = "default" } tm.init() return tm, nil } // Profiling init func (tm *IotMgr) init() { if profiling { profilingTimers = make(map[string]time.Time) } registeredIotPlatformsMap = make(map[string]IotPlatformInfo, 0) devicesMap = make(map[string]DeviceInfo, 0) devicesPerPlatformMap = make(map[string][]string, 0) platformPerUserTransportIdMap = make(map[string][]string, 0) tm.refreshTicker = nil } // DeleteIotMgr - func (tm *IotMgr) DeleteIotMgr() (err error) { tm.stopRefreshTicker() return nil } func (tm *IotMgr) startRefreshTicker() { log.Debug("Starting refresh loop") tm.refreshTicker = time.NewTicker(refreshTickerExpeary * time.Second) go func() { if tm.refreshTicker != nil { for range tm.refreshTicker.C { // Refresh the list of devices tm.wg.Add(1) err := tm.populateDevicesPerIotPlatforms() if err != nil { log.Error(err) } tm.wg.Done() } } }() } func (tm *IotMgr) stopRefreshTicker() { if tm.refreshTicker != nil { // Refresh the list of devices tm.wg.Add(1) tm.refreshTicker.Stop() tm.refreshTicker = nil registeredIotPlatformsMap = nil devicesMap = nil devicesPerPlatformMap = nil platformPerUserTransportIdMap = nil tm.wg.Done() log.Debug("Refresh loop stopped") } } func (tm *IotMgr) RegisterIotPlatformInfo(iotPlatformInfo IotPlatformInfo) (err error) { if profiling { profilingTimers["RegisterIotPlatformInfo"] = time.Now() } log.Info(">>> RegisterIotPlatformInfo: iotPlatformId: ", iotPlatformInfo) if iotPlatformInfo.Enabled { registeredIotPlatformsMap[iotPlatformInfo.IotPlatformId] = iotPlatformInfo } // else, Skip disabled platform if len(registeredIotPlatformsMap) == 1 { log.Info("RegisterIotPlatformInfo: Starting timer") tm.startRefreshTicker() log.Info("RegisterIotPlatformInfo: timer running") } if profiling { now := time.Now() log.Debug("RegisterIotPlatformInfo: ", now.Sub(profilingTimers["RegisterIotPlatformInfo"])) } return nil } func (tm *IotMgr) DeregisterIotPlatformInfo(iotPlatformId string) (err error) { if profiling { profilingTimers["DeregisterIotPlatformInfo"] = time.Now() } tm.wg.Wait() log.Info("DeregisterIotPlatformInfo: After Synchro") log.Info(">>> DeregisterIotPlatformInfo: iotPlatformId: ", iotPlatformId) // Remove the list of the devices for this IoT platform if val, ok := devicesPerPlatformMap[iotPlatformId]; ok { // Free resources from devicesMap map for _, dev := range val { delete(devicesMap, dev) } // End of 'for' statement delete(devicesPerPlatformMap, iotPlatformId) log.Info("DeregisterIotPlatformInfo: platformPerUserTransportIdMap (before): ", iotPlatformId) for _, rule := range platformPerUserTransportIdMap { for idx, pltf := range rule { if pltf == iotPlatformId { rule = append(rule[:idx], rule[idx+1:]...) } } // End of 'for' statement } // End of 'for' statement log.Info("DeregisterIotPlatformInfo: platformPerUserTransportIdMap (after): ", iotPlatformId) } if _, ok := registeredIotPlatformsMap[iotPlatformId]; ok { delete(registeredIotPlatformsMap, iotPlatformId) if len(registeredIotPlatformsMap) == 0 { tm.stopRefreshTicker() } } if profiling { now := time.Now() log.Debug("DeregisterIotPlatformInfo: ", now.Sub(profilingTimers["DeregisterIotPlatformInfo"])) } return nil } func (tm *IotMgr) GetDevices() (devices []DeviceInfo, err error) { if profiling { profilingTimers["GetDevices"] = time.Now() } log.Info(">>> GetDevices") tm.wg.Wait() log.Info("GetDevices: After Synchro") devices = make([]DeviceInfo, 0) if len(registeredIotPlatformsMap) == 0 { return devices, nil } for _, v := range devicesMap { log.Info("GetDevices: adding device: ", v) devices = append(devices, v) } // End of 'for' statement log.Info("GetDevices: devices: ", devices) if profiling { now := time.Now() log.Debug("GetDevices: ", now.Sub(profilingTimers["GetDevices"])) } return devices, nil } func (tm *IotMgr) GetDevice(deviceId string) (device DeviceInfo, err error) { if profiling { profilingTimers["GetDevice"] = time.Now() } log.Info(">>> GetDevice: deviceId: ", deviceId) tm.wg.Wait() log.Info("GetDevice: After Synchro") if val, ok := devicesMap[deviceId]; !ok { err = errors.New("Wrong Device identifier") return device, err } else { device = val } if profiling { now := time.Now() log.Debug("GetDevice: ", now.Sub(profilingTimers["GetDevice"])) } log.Info("GetDevice: device: ", device) return device, nil } func (tm *IotMgr) CreateDevice(device DeviceInfo) (deviceResp DeviceInfo, err error) { log.Info(">>> CreateDevice: ", device) // RequestedMecTrafficRule is not supported yet if len(device.RequestedMecTrafficRule) != 0 { err = errors.New("Unsupported traffic rule provided") log.Error(err.Error()) return deviceResp, nil } if len(device.RequestedIotPlatformId) != 0 { deviceResp, err = tm.createDeviceWithIotPlatformId(device, device.RequestedIotPlatformId) } else { deviceResp, err = tm.createDeviceWithRequestedUserTransportId(device, device.RequestedUserTransportId) } if err != nil { log.Error(err.Error()) return deviceResp, err } log.Info("CreateDevice: deviceResp: ", deviceResp) return deviceResp, nil } /* * func populateDevicesPerIotPlatforms IoT devices for all registered Iot platform * @return {struct} nil on success, error otherwise */ func (tm *IotMgr) populateDevicesPerIotPlatforms() error { tm.mutex.Lock() defer tm.mutex.Unlock() if profiling { profilingTimers["populateDevicesPerIotPlatforms"] = time.Now() } if len(registeredIotPlatformsMap) == 0 { return nil } // Refresh the list of devices for all registered Iot platform for _, iotPlatform := range registeredIotPlatformsMap { log.Debug("populateDevicesPerIotPlatforms: processing: ", iotPlatform.IotPlatformId) err := tm.populateDevices(iotPlatform) if err != nil { log.Error("populateDevicesPerIotPlatforms: ", err) continue } } // End of 'for' statement if profiling { now := time.Now() log.Debug("populateDevicesPerIotPlatforms: ", now.Sub(profilingTimers["populateDevicesPerIotPlatforms"])) } return nil } /* * func PopulateDevices IoT devices for the specified Iot platform * @param {string} iotPlatformId contains the IoT platform identifier * @return {struct} nil on success, error otherwise */ func (tm *IotMgr) populateDevices(iotPlatformInfo IotPlatformInfo) error { if profiling { profilingTimers["populateDevices"] = time.Now() } log.Info(">>> populateDevices: iotPlatformId=", iotPlatformInfo.IotPlatformId) // 1. Get the list of the AE // Build the URL t := registeredIotPlatformsMap[iotPlatformInfo.IotPlatformId].CustomServicesTransportInfo[0] log.Debug("populateDevices: t.Endpoint.Addresses[0]=", t.Endpoint.Addresses[0]) url := "http://" + t.Endpoint.Addresses[0].Host + ":" + strconv.Itoa(int(t.Endpoint.Addresses[0].Port)) + "/" + t.Name log.Debug("populateDevices: url=", url) // Build the headers var headers = http.Header{} headers["Accept"] = []string{headerAccept} headers["Content-Type"] = []string{headerContentType} headers["X-M2M-Origin"] = []string{"CAdmin"} // FIXME FSCOM How to get it headers["X-M2M-RI"] = []string{uuid.New().String()} headers["X-M2M-RVI"] = []string{t.Version} // Build the queries queries := map[string]string{} queries["fu"] = "1" // Filter usage queries["ty"] = "4" // Filter on oneM2M CIN for device // Send the request response, err := sendRequest("GET", url, headers, nil, nil, queries, 200) if err != nil { log.Error("populateDevices: ", err.Error()) return err } log.Debug("populateDevices: response: ", string(response)) var oneM2M_uril map[string][]string err = json.Unmarshal(response, &oneM2M_uril) if err != nil { log.Error("populateDevices: ", err.Error()) return err } log.Debug("populateDevices: oneM2M_uril: ", len(oneM2M_uril)) log.Debug(oneM2M_uril) if _, ok := oneM2M_uril["m2m:uril"]; !ok { err := errors.New("populateDevices: Key not found: m2m:uril") log.Error(err.Error()) return err } // Loop for each CIN and build the device list for _, v := range oneM2M_uril["m2m:uril"] { log.Debug("populateDevices: Processing key: ", v) url := "http://" + t.Endpoint.Addresses[0].Host + ":" + strconv.Itoa(int(t.Endpoint.Addresses[0].Port)) + "/" + v log.Debug("populateDevices: url=", url) // Build the headers var headers = http.Header{} headers["Accept"] = []string{headerAccept} headers["Content-Type"] = []string{headerContentType} headers["X-M2M-Origin"] = []string{"CAdmin"} // FIXME FSCOM How to get it headers["X-M2M-RI"] = []string{uuid.New().String()} headers["X-M2M-RVI"] = []string{t.Version} // Build the queries queries := map[string]string{} queries["fu"] = "2" // Filter usage // Send the request response, err := sendRequest("GET", url, headers, nil, nil, queries, 200) if err != nil { log.Error("populateDevices: ", err.Error()) return err } log.Debug("populateDevices: response: ", string(response)) var oneM2M_cin map[string]map[string]interface{} err = json.Unmarshal(response, &oneM2M_cin) if err != nil { log.Error("populateDevices: ", err.Error()) continue } log.Debug("populateDevices: type(oneM2M_cin): ", reflect.TypeOf(oneM2M_cin)) log.Debug("populateDevices: len(oneM2M_cin): ", len(oneM2M_cin)) log.Debug("populateDevices: oneM2M_cin: ", oneM2M_cin) for _, m := range oneM2M_cin { //log.Debug("==> ", i, " value is ", m) var device = DeviceInfo{} device.RequestedIotPlatformId = iotPlatformInfo.IotPlatformId device.RequestedUserTransportId = registeredIotPlatformsMap[iotPlatformInfo.IotPlatformId].UserTransportInfo[0].Id device.Enabled = true // m is a map[string]interface. // loop over keys and values in the map. for k, v := range m { log.Debug(k, " value is ", v) log.Debug("populateDevices: type(v): ", reflect.TypeOf(v)) if item, ok := v.(string); ok { device.DeviceMetadata = append( device.DeviceMetadata, KeyValuePair{ Key: k, Value: string(item), }) } else if item, ok := v.(float64); ok { device.DeviceMetadata = append( device.DeviceMetadata, KeyValuePair{ Key: k, Value: strconv.FormatFloat(item, 'f', -1, 64), }) } else if item, ok := v.(int64); ok { device.DeviceMetadata = append( device.DeviceMetadata, KeyValuePair{ Key: k, Value: strconv.FormatInt(item, 10), }) } else if item, ok := v.(bool); ok { device.DeviceMetadata = append( device.DeviceMetadata, KeyValuePair{ Key: k, Value: strconv.FormatBool(item), }) } else if item, ok := v.([]string); ok { device.DeviceMetadata = append( device.DeviceMetadata, KeyValuePair{ Key: k, Value: strings.Join(item, ","), }) } else { log.Error("populateDevices: Failed to process ", k) } // if k == "rn" { // if item, ok := v.(string); ok { // device.DeviceId = item // } else { // log.Error("populateDevices: Failed to process ", k) // } // } else if k == "ri" { // if item, ok := v.(string); ok { // device.SensorIdentifier = item // } else { // log.Error("populateDevices: Failed to process ", k) // } // } else if k == "ty" { // if item, ok := v.(float64); ok { // device.SensorStatusType = strconv.FormatFloat(item, 'f', -1, 64) // } else { // log.Error("populateDevices: Failed to process ", k) // } // } else { // default: if k == "lt" || k == "et" || k == "ct" || k == "st" || k == "pi" || k == "lbl" { // if item, ok := v.(string); ok { // device.SensorCharacteristicList = append( // device.SensorCharacteristicList, // SensorCharacteristic{ // CharacteristicName: k, // CharacteristicValue: string(item), // CharacteristicUnitOfMeasure: nil, // }) // } else if item, ok := v.(float64); ok { // device.SensorCharacteristicList = append( // device.SensorCharacteristicList, // SensorCharacteristic{ // CharacteristicName: k, // CharacteristicValue: strconv.FormatFloat(item, 'f', -1, 64), // CharacteristicUnitOfMeasure: nil, // }) // } else if item, ok := v.([]string); ok { // device.SensorCharacteristicList = append( // device.SensorCharacteristicList, // SensorCharacteristic{ // CharacteristicName: k, // CharacteristicValue: strings.Join(item, ","), // CharacteristicUnitOfMeasure: nil, // }) // } else { // log.Error("populateDevices: Failed to process ", k) // } // } } // End of 'for' loop log.Info("populateDevices: device: ", device) devicesMap[device.DeviceId] = device devicesPerPlatformMap[device.RequestedIotPlatformId] = append(devicesPerPlatformMap[device.RequestedIotPlatformId], device.DeviceId) } // End of 'for' loop } // End of 'for' statement log.Info("populateDevices: devicesMap: ", devicesMap) log.Info("populateDevices: devicesPerPlatformMap: ", devicesPerPlatformMap) if profiling { now := time.Now() log.Debug("populateDevices: ", now.Sub(profilingTimers["populateDevices"])) } return nil } func (tm *IotMgr) createDeviceWithIotPlatformId(device DeviceInfo, requestedIotPlatformId string) (deviceResp DeviceInfo, err error) { log.Info(">>> createDeviceWithIotPlatformId: ", device) tm.wg.Wait() log.Info("createDeviceWithIotPlatformId: After Synchro") // Sanity checks if _, ok := registeredIotPlatformsMap[requestedIotPlatformId]; !ok { err = errors.New("Invalid IotPlatform identifier") return deviceResp, err } deviceResp, err = tm.oneM2M_create(device, requestedIotPlatformId, "ae") if err != nil { log.Error("createDeviceWithIotPlatformId: ", err.Error()) return device, err } log.Debug("createDeviceWithIotPlatformId: deviceResp: ", deviceResp) return deviceResp, nil } func (tm *IotMgr) createDeviceWithRequestedUserTransportId(device DeviceInfo, requestedUserTransportId string) (deviceResp DeviceInfo, err error) { log.Info(">>> createDeviceWithRequestedUserTransportId: ", device) tm.wg.Wait() log.Info("createDeviceWithIotPlatformId: After Synchro") if val, ok := platformPerUserTransportIdMap[requestedUserTransportId]; ok { deviceResp, err = tm.createDeviceWithIotPlatformId(device, val[0]) } else { err = errors.New("Invalid UserTransportId") } if err != nil { log.Error("createDeviceWithIotPlatformId: ", err.Error()) return deviceResp, err } log.Info("createDeviceWithIotPlatformId: deviceResp: ", deviceResp) return deviceResp, nil } func (tm *IotMgr) oneM2M_create(device DeviceInfo, requestedIotPlatformId string, type_ string) (deviceResp DeviceInfo, err error) { // FIXME FSCOM: requestedIotPlatformId should be useless t := registeredIotPlatformsMap[requestedIotPlatformId].CustomServicesTransportInfo[0] log.Debug("oneM2M_create: t.Endpoint.Addresses[0]=", t.Endpoint.Addresses[0]) // Build the headers var headers = http.Header{} headers["Accept"] = []string{headerAccept} headers["X-M2M-Origin"] = []string{"C" + requestedIotPlatformId} // FIXME FSCOM How to get it headers["X-M2M-RI"] = []string{uuid.New().String()} headers["X-M2M-RVI"] = []string{t.Version} var s string if type_ == "AE" { s = headerContentType + ";ty=2" } else if type_ == "CNT" { s = headerContentType + ";ty=4" } headers["Content-Type"] = []string{s} // Build the url and the body var url string var bodyMap = map[string]map[string]interface{}{} // Initialize the entry if type_ == "AE" { // FIXME FSCOM Clarify how to map Deviceinfo with oneM2M AE/CNT/fexContainer bodyMap["m2m:ae"] = make(map[string]interface{}, 0) bodyMap["m2m:ae"]["api"] = "Norg.etsi." + requestedIotPlatformId + "." + device.DeviceId bodyMap["m2m:ae"]["rn"] = device.DeviceId bodyMap["m2m:ae"]["rr"] = false bodyMap["m2m:ae"]["srv"] = []string{t.Version} url = "http://" + t.Endpoint.Addresses[0].Host + ":" + strconv.Itoa(int(t.Endpoint.Addresses[0].Port)) + "/" + t.Name } else if type_ == "CNT" { bodyMap["m2m:cnt"] = make(map[string]interface{}, 0) bodyMap["m2m:cnt"]["mbs"] = 10000 bodyMap["m2m:cnt"]["mni"] = 10 bodyMap["m2m:cnt"]["rn"] = device.DeviceId bodyMap["m2m:cnt"]["srv"] = []string{t.Version} // Add metadata if len(device.DeviceMetadata) != 0 { for _, val := range device.DeviceMetadata { log.Debug("oneM2M_create: Adding CNT metadata: ", val) // FIXME FSCOM Add metadata } // End of 'for' statement } url = "http://" + t.Endpoint.Addresses[0].Host + ":" + strconv.Itoa(int(t.Endpoint.Addresses[0].Port)) + "/" + t.Name } else { err = errors.New("oneM2M_create: Invalid type") log.Error("oneM2M_create: ", err.Error()) return deviceResp, err } log.Debug("oneM2M_create: url=", url) log.Debug("oneM2M_create: bodyMap=", bodyMap) body, err := json.Marshal(bodyMap) if err != nil { log.Error("oneM2M_create: ", err.Error()) return deviceResp, err } log.Debug("oneM2M_create: Request body: ", string(body)) // Send the request response, err := sendRequest("POST", url, headers, bytes.NewBuffer(body), nil, nil, 201) if err != nil { log.Error("oneM2M_create: ", err.Error()) return deviceResp, err } log.Debug("oneM2M_create: response: ", string(response)) var d map[string]map[string]interface{} err = json.Unmarshal(response, &d) if err != nil { log.Error("oneM2M_create: ", err.Error()) return deviceResp, err } log.Debug("oneM2M_create: d: ", d) // Add additional entries deviceResp, err = tm.oneM2M_deserialize(deviceResp, d) if err != nil { log.Error("oneM2M_create: ", err.Error()) return deviceResp, err } log.Debug("oneM2M_create: deviceResp: ", deviceResp) return deviceResp, nil } func (tm *IotMgr) oneM2M_discovery(device DeviceInfo, requestedIotPlatformId string, type_ string) (deviceResp DeviceInfo, err error) { // FIXME FSCOM: requestedIotPlatformId should be useless // 1. Get the list of the AE // Build the URL t := registeredIotPlatformsMap[requestedIotPlatformId].CustomServicesTransportInfo[0] log.Debug("oneM2M_discovery: t.Endpoint.Addresses[0]=", t.Endpoint.Addresses[0]) url := "http://" + t.Endpoint.Addresses[0].Host + ":" + strconv.Itoa(int(t.Endpoint.Addresses[0].Port)) + "/" + t.Name log.Debug("oneM2M_discovery: url=", url) // Build the headers var headers = http.Header{} headers["Accept"] = []string{headerAccept} headers["Content-Type"] = []string{headerContentType} headers["X-M2M-Origin"] = []string{"CAdmin"} // FIXME FSCOM How to get it headers["X-M2M-RI"] = []string{uuid.New().String()} headers["X-M2M-RVI"] = []string{t.Version} // Build the queries queries := map[string]string{} if type_ == "CN" { queries["fu"] = "1" // Filter usage queries["ty"] = "4" // Filter on oneM2M CIN for device } else { err = errors.New("oneM2M_discovery: Invalid type") log.Error("oneM2M_discovery: ", err.Error()) return deviceResp, err } // Send the request response, err := sendRequest("GET", url, headers, nil, nil, queries, 200) if err != nil { log.Error("oneM2M_discovery: ", err.Error()) return deviceResp, err } log.Debug("oneM2M_discovery: response: ", string(response)) var d map[string]map[string]interface{} err = json.Unmarshal(response, &d) if err != nil { log.Error("oneM2M_discovery: ", err.Error()) return deviceResp, err } log.Debug("oneM2M_discovery: d: ", d) // Add additional entries deviceResp, err = tm.oneM2M_deserialize(device, d) if err != nil { log.Error("oneM2M_discovery: ", err.Error()) return deviceResp, err } // log.Debug("oneM2M_discovery: deviceResp: ", deviceResp) return deviceResp, nil } func (tm *IotMgr) oneM2M_get(device DeviceInfo, requestedIotPlatformId string, type_ string) (deviceResp DeviceInfo, err error) { // FIXME FSCOM: requestedIotPlatformId should be useless ri := "" for _, val := range device.DeviceMetadata { if val.Key == "ri" { ri = val.Value break } } if ri == "" { err = errors.New("oneM2M_get: Cannot find \"ri\" value") log.Error("oneM2M_get: ", err.Error()) return deviceResp, err } // 1. Get the list of the AE // Build the URL t := registeredIotPlatformsMap[requestedIotPlatformId].CustomServicesTransportInfo[0] log.Debug("oneM2M_get: t.Endpoint.Addresses[0]=", t.Endpoint.Addresses[0]) url := "http://" + t.Endpoint.Addresses[0].Host + ":" + strconv.Itoa(int(t.Endpoint.Addresses[0].Port)) + "/" + ri log.Debug("oneM2M_get: url=", url) // Build the headers var headers = http.Header{} headers["Accept"] = []string{headerAccept} headers["Content-Type"] = []string{headerContentType} headers["X-M2M-Origin"] = []string{"CAdmin"} // FIXME FSCOM How to get it headers["X-M2M-RI"] = []string{uuid.New().String()} headers["X-M2M-RVI"] = []string{t.Version} // Build the queries queries := map[string]string{} if type_ == "AE" { queries = nil } else if type_ == "CN" { queries["fu"] = "1" // Filter usage queries["ty"] = "4" // Filter on oneM2M CIN for device } else { err = errors.New("oneM2M_get: Invalid type") log.Error("oneM2M_get: ", err.Error()) return deviceResp, err } // Send the request response, err := sendRequest("GET", url, headers, nil, nil, queries, 200) if err != nil { log.Error("oneM2M_get: ", err.Error()) return deviceResp, err } log.Debug("oneM2M_get: response: ", string(response)) var d map[string]map[string]interface{} err = json.Unmarshal(response, &d) if err != nil { log.Error("oneM2M_get: ", err.Error()) return deviceResp, err } log.Debug("oneM2M_get: d: ", d) // Add additional entries deviceResp, err = tm.oneM2M_deserialize(device, d) if err != nil { log.Error("oneM2M_get: ", err.Error()) return deviceResp, err } log.Debug("oneM2M_get: deviceResp: ", deviceResp) return deviceResp, nil } func (tm *IotMgr) oneM2M_delete(device DeviceInfo, requestedIotPlatformId string, type_ string) (err error) { // FIXME FSCOM: requestedIotPlatformId should be useless ri := "" for _, val := range device.DeviceMetadata { if val.Key == "ri" { ri = val.Value break } } if ri == "" { err = errors.New("oneM2M_delete: Cannot find \"ri\" value") log.Error("oneM2M_delete: ", err.Error()) return err } // Build the URL t := registeredIotPlatformsMap[requestedIotPlatformId].CustomServicesTransportInfo[0] log.Debug("oneM2M_delete: t.Endpoint.Addresses[0]=", t.Endpoint.Addresses[0]) url := "http://" + t.Endpoint.Addresses[0].Host + ":" + strconv.Itoa(int(t.Endpoint.Addresses[0].Port)) + "/" + ri log.Debug("oneM2M_delete: url=", url) // Build the headers var headers = http.Header{} headers["Accept"] = []string{headerAccept} headers["Content-Type"] = []string{headerContentType} headers["X-M2M-Origin"] = []string{"C" + requestedIotPlatformId} // FIXME FSCOM How to get it headers["X-M2M-RI"] = []string{uuid.New().String()} headers["X-M2M-RI"] = []string{uuid.New().String()} headers["X-M2M-RVI"] = []string{t.Version} // Send the request _, err = sendRequest("DELETE", url, headers, nil, nil, nil, 200) if err != nil { log.Error("oneM2M_delete: ", err.Error()) return err } return nil } func (tm *IotMgr) oneM2M_deserialize(device DeviceInfo, response map[string]map[string]interface{}) (deviceResp DeviceInfo, err error) { deviceResp = device // Same data type log.Debug("oneM2M_deserialize: type(response): ", reflect.TypeOf(response)) log.Debug("oneM2M_deserialize: len(response): ", len(response)) log.Debug("oneM2M_deserialize: response: ", response) if val, ok := response["m2m:ae"]; ok { log.Debug("oneM2M_deserialize: val: ", val) } else { log.Error("oneM2M_deserialize: Key not found") } for i, m := range response { log.Debug("==> ", i, " value is ", m) // m is a map[string]interface. // loop over keys and values in the map. for k, v := range m { log.Debug(k, " value is ", v) log.Debug("oneM2M_deserialize: type(v): ", reflect.TypeOf(v)) if item, ok := v.(string); ok { deviceResp.DeviceMetadata = append( deviceResp.DeviceMetadata, KeyValuePair{ Key: k, Value: string(item), }) } else if item, ok := v.(float64); ok { deviceResp.DeviceMetadata = append( deviceResp.DeviceMetadata, KeyValuePair{ Key: k, Value: strconv.FormatFloat(item, 'f', -1, 64), }) } else if item, ok := v.(int64); ok { deviceResp.DeviceMetadata = append( deviceResp.DeviceMetadata, KeyValuePair{ Key: k, Value: strconv.FormatInt(item, 10), }) } else if item, ok := v.(bool); ok { deviceResp.DeviceMetadata = append( deviceResp.DeviceMetadata, KeyValuePair{ Key: k, Value: strconv.FormatBool(item), }) } else if item, ok := v.([]string); ok { deviceResp.DeviceMetadata = append( deviceResp.DeviceMetadata, KeyValuePair{ Key: k, Value: strings.Join(item, ","), }) } else if item, ok := v.([]int64); ok { log.Error("oneM2M_deserialize: Failed to convert list of int64 into string: ", item) } else if _, ok := v.([]interface{}); ok { log.Error("oneM2M_deserialize: Failed to convert []interface {} into string: ", v.([]interface{})) } else { log.Error("oneM2M_deserialize: Failed to process: ", k) } } // End of 'for' loop } // End of 'for' loop return deviceResp, nil } func sendRequest(method string, url string, headers http.Header, body io.Reader, vars map[string]string, query map[string]string, code int) ([]byte, error) { //log.Debug(">>> sendRequest: url: ", url) //log.Debug(">>> sendRequest: headers: ", headers) req, err := http.NewRequest(method, url, body) if err != nil || req == nil { return nil, err } if vars != nil { req = mux.SetURLVars(req, vars) } if query != nil { q := req.URL.Query() for k, v := range query { q.Add(k, v) } req.URL.RawQuery = q.Encode() } req.Header = headers req.Close = true //log.Debug("sendRequest: req: ", req) rr, err := http.DefaultClient.Do(req) if err != nil { return nil, err } // Check the status code is what we expect. //log.Debug("sendRequest: rr: ", rr) if status := rr.StatusCode; status != code { s := fmt.Sprintf("Wrong status code - got %v want %v", status, code) return nil, errors.New(s) } responseData, err := ioutil.ReadAll(rr.Body) if err != nil { return nil, err } //log.Debug("sendRequest: responseData: ", responseData) return responseData, nil }