/* * Copyright (c) 2024-2025 The AdvantEDGE Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package sssmgr import ( "bytes" "encoding/json" "errors" "fmt" "net/http" "reflect" "strconv" "strings" "sync" "time" log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger" uuid "github.com/google/uuid" ) // Sensors-Sharing Service Manager type SssMgr struct { name string namespace string bindingProtocol string host string port int cse_name string hostId string mutex sync.Mutex wg sync.WaitGroup refreshTicker *time.Ticker sss_discovery_notify func() sss_status_notify func() sss_data_notify func() } type IotPlatformInfo struct { Address string Port int Name string IotPlatformId string } type Point struct { Latitude float64 Longitude float64 } type SensorCharacteristic struct { CharacteristicName string CharacteristicValue string CharacteristicUnitOfMeasure *string } type SensorDiscoveryInfo struct { SensorIdentifier string SensorType string SensorPropertyList []string SensorCharacteristicList []SensorCharacteristic SensorPosition *Point IotPlatformId string } var registeredIotPlatformsMap = map[string]IotPlatformInfo{} // List of discovered IOT Plateform var sensorsMap = map[string]SensorDiscoveryInfo{} // Map sensors by sensorIdentifier var sensorsPerPlatformMap = map[string][]string{} // Map dsensorIdentifiers per platform // Timer to refresh devices list for all IoT platform const refreshTickerExpeary = 10 // In seconds // Enable profiling const profiling = false var profilingTimers map[string]time.Time var protocol SssMgrBindingProtocol // NewSssMgr - Creates and initializes a new SSS Traffic Manager func NewSssMgr(name string, namespace string, bindingProtocol string, host string, port int, hostId string, cse_name string, sss_discovery_notify func(), sss_status_notify func(), sss_data_notify func()) (tm *SssMgr, err error) { if name == "" { err = errors.New("Missing connector name") return nil, err } // Create new Traffic Manager tm = new(SssMgr) tm.name = name if namespace != "" { tm.namespace = namespace } else { tm.namespace = "default" } tm.bindingProtocol = bindingProtocol tm.host = host tm.port = port tm.cse_name = cse_name tm.hostId = hostId if tm.bindingProtocol == "MQTT" { if tm.host == "" { err := errors.New("Host not set for MQTTP protocol") log.Error(err.Error()) return nil, err } if tm.port == 0 { tm.port = 1883 } protocol = NewSssMgrMqtt() } else if tm.bindingProtocol == "REST_HTTP" { if tm.port == 0 { tm.port = 80 } protocol = NewSssMgrHttp() } else { err := errors.New("Binding protocol not set") log.Error(err.Error()) return nil, err } if hostId == "" { err := errors.New("hostId not set") log.Error(err.Error()) return nil, err } if cse_name == "" { err := errors.New("cse_name not set") log.Error(err.Error()) return nil, err } err = protocol.init(tm) if err != nil { log.Error(err.Error()) return nil, err } tm.sss_discovery_notify = sss_discovery_notify tm.sss_status_notify = sss_status_notify tm.sss_data_notify = sss_data_notify tm.init() return tm, nil } // Profiling init func (tm *SssMgr) init() { if profiling { profilingTimers = make(map[string]time.Time) } registeredIotPlatformsMap = make(map[string]IotPlatformInfo, 1) registeredIotPlatformsMap[tm.hostId] = IotPlatformInfo{ Address: tm.host, Port: tm.port, Name: tm.cse_name, IotPlatformId: tm.hostId, } sensorsMap = make(map[string]SensorDiscoveryInfo, 0) sensorsPerPlatformMap = make(map[string][]string, 0) tm.refreshTicker = nil } // DeleteSssMgr - func (tm *SssMgr) DeleteSssMgr() (err error) { tm.stopRefreshTicker() if protocol != nil { protocol.uninit() protocol = nil } return nil } func (tm *SssMgr) startRefreshTicker() { log.Debug("Starting refresh loop") tm.refreshTicker = time.NewTicker(refreshTickerExpeary * time.Second) go func() { if tm.refreshTicker != nil { for range tm.refreshTicker.C { // Refresh the list of devices tm.wg.Add(1) err := tm.populateDevicesPerIotPlatforms() if err != nil { log.Error(err) } tm.wg.Done() } } }() } func (tm *SssMgr) stopRefreshTicker() { if tm.refreshTicker != nil { // Refresh the list of devices tm.wg.Add(1) tm.refreshTicker.Stop() tm.refreshTicker = nil registeredIotPlatformsMap = nil sensorsMap = nil sensorsPerPlatformMap = nil tm.wg.Done() log.Debug("Refresh loop stopped") } } func (tm *SssMgr) SensorDiscoveryInfoAll() (sensors []SensorDiscoveryInfo, err error) { if profiling { profilingTimers["SensorDiscoveryInfoAll"] = time.Now() } log.Info(">>> SensorDiscoveryInfoAll") err = tm.populateDevicesPerIotPlatforms() // FIXME FSCOM User timer. See startRefreshTicker if err != nil { return sensors, err } tm.wg.Wait() log.Info("SensorDiscoveryInfoAll: After Synchro") sensors = make([]SensorDiscoveryInfo, 0) if len(registeredIotPlatformsMap) == 0 { return sensors, nil } for _, v := range sensorsMap { log.Info("SensorDiscoveryInfoAll: adding sensor: ", v) sensors = append(sensors, v) } // End of 'for' statement log.Info("SensorDiscoveryInfoAll: sensors: ", sensors) if profiling { now := time.Now() log.Debug("SensorDiscoveryInfoAll: ", now.Sub(profilingTimers["SensorDiscoveryInfoAll"])) } return sensors, nil } func (tm *SssMgr) GetSensor(sensorIdentifier string) (sensor SensorDiscoveryInfo, err error) { if profiling { profilingTimers["GetSensor"] = time.Now() } log.Info(">>> GetSensor: sensorIdentifier: ", sensorIdentifier) tm.wg.Wait() log.Info("GetSensor: After Synchro") if val, ok := sensorsMap[sensorIdentifier]; !ok { err = errors.New("Wrong Device identifier") return sensor, err } else { sensor = val } if profiling { now := time.Now() log.Debug("GetSensor: ", now.Sub(profilingTimers["GetSensor"])) } log.Info("GetSensor: sensor: ", sensor) return sensor, nil } /* * func populateDevicesPerIotPlatforms IoT devices for all registered Iot platform * @return {struct} nil on success, error otherwise */ func (tm *SssMgr) populateDevicesPerIotPlatforms() error { tm.mutex.Lock() defer tm.mutex.Unlock() if profiling { profilingTimers["populateDevicesPerIotPlatforms"] = time.Now() } if len(registeredIotPlatformsMap) == 0 { return nil } // Refresh the list of devices for all registered Iot platform for _, iotPlatform := range registeredIotPlatformsMap { log.Debug("populateDevicesPerIotPlatforms: processing: ", iotPlatform.Address) err := tm.populateSensors(iotPlatform) if err != nil { log.Error("populateDevicesPerIotPlatforms: ", err) continue } } // End of 'for' statement if profiling { now := time.Now() log.Debug("populateDevicesPerIotPlatforms: ", now.Sub(profilingTimers["populateDevicesPerIotPlatforms"])) } return nil } /* * func PopulateDevices IoT devices for the specified Iot platform * @param {string} iotPlatformId contains the IoT platform identifier * @return {struct} nil on success, error otherwise */ func (tm *SssMgr) populateSensors(iotPlatformInfo IotPlatformInfo) error { if profiling { profilingTimers["populateSensors"] = time.Now() } log.Info(">>> populateSensors: iotPlatformInfo=", iotPlatformInfo) // 1. Get the list of the AE // Build the context var ctx = SssMgrBindingProtocolContext{ host: iotPlatformInfo.Address, port: iotPlatformInfo.Port, name: iotPlatformInfo.Name, to: iotPlatformInfo.Name, from: "Admin", // FIXME FSCOM How to get it op: 2, // RETRIEVE ty: -1, rqi: uuid.New().String(), rvi: []string{"4"}, // FIXME FSCOM How to get it code: 200, } // Build the queries queries := map[string]string{} queries["fu"] = "1" // Filter usage queries["ty"] = "3" // Filter on oneM2M CIN for sensors ctx.queries = queries err, resp := protocol.send(ctx) if err != nil { log.Error("OneM2M_create: ", err.Error()) return err } log.Debug("populateSensors: resp: ", resp) log.Debug("populateSensors: TypeOf(resp): ", reflect.TypeOf(resp)) oneM2M_uril := resp.(map[string]interface{}) log.Debug("populateSensors: oneM2M_uril: ", oneM2M_uril) log.Debug("populateSensors: TypeOf(oneM2M_uril): ", reflect.TypeOf(oneM2M_uril)) log.Debug("populateSensors: len(oneM2M_uril): ", len(oneM2M_uril)) // Loop for each CIN and build the sensor list for _, v := range oneM2M_uril["m2m:uril"].([]interface{}) { log.Debug("populateSensors: Processing key: v: ", v) log.Debug("populateSensors: Processing key: TypeOf(v): ", reflect.TypeOf(v)) s := v.(string) if s == "laboai-cse-in/acpCreateACPs" || s == "laboai-cse-in/CAdmin" { // FIXME FSCOM Bug in MQTT DISCOVERY request which does not provide the same response that HTTP DISCOVERY with the same filter criteria continue // Discard it } ctx.to = s ctx.queries["fu"] = "2" err, resp := protocol.send(ctx) if err != nil { log.Error("OneM2M_create: ", err.Error()) continue } log.Debug("populateSensors: resp: ", resp) log.Debug("populateSensors: type(resp): ", reflect.TypeOf(resp)) if resp.(map[string]interface{}) == nil || resp.(map[string]interface{})["m2m:cnt"] == nil { continue } oneM2M_cin := resp.(map[string]interface{})["m2m:cnt"].(map[string]interface{}) log.Debug("populateSensors: type(oneM2M_cin): ", reflect.TypeOf(oneM2M_cin)) log.Debug("populateSensors: len(oneM2M_cin): ", len(oneM2M_cin)) log.Debug("populateSensors: oneM2M_cin: ", oneM2M_cin) var sensor = SensorDiscoveryInfo{ IotPlatformId: iotPlatformInfo.IotPlatformId, } for k, v := range oneM2M_cin { log.Debug(k, " value is ", v) log.Debug("populateSensors: type(v): ", reflect.TypeOf(v)) if k == "ri" { if item, ok := v.(string); ok { sensor.SensorIdentifier = item } else { log.Error("populateSensors: Failed to process ", k) } } else if k == "ty" { if item, ok := v.(float64); ok { sensor.SensorType = strconv.FormatFloat(item, 'f', -1, 64) } else { log.Error("populateSensors: Failed to process ", k) } } else { sensor.SensorPropertyList = append(sensor.SensorPropertyList, k) if item, ok := v.(string); ok { sensor.SensorCharacteristicList = append( sensor.SensorCharacteristicList, SensorCharacteristic{ CharacteristicName: k, CharacteristicValue: string(item), }) } else if item, ok := v.(float64); ok { sensor.SensorCharacteristicList = append( sensor.SensorCharacteristicList, SensorCharacteristic{ CharacteristicName: k, CharacteristicValue: strconv.FormatFloat(item, 'f', -1, 64), }) } else if item, ok := v.(int64); ok { sensor.SensorCharacteristicList = append( sensor.SensorCharacteristicList, SensorCharacteristic{ CharacteristicName: k, CharacteristicValue: strconv.FormatInt(item, 10), }) } else if item, ok := v.(bool); ok { sensor.SensorCharacteristicList = append( sensor.SensorCharacteristicList, SensorCharacteristic{ CharacteristicName: k, CharacteristicValue: strconv.FormatBool(item), }) } else if item, ok := v.([]string); ok { sensor.SensorCharacteristicList = append( sensor.SensorCharacteristicList, SensorCharacteristic{ CharacteristicName: k, CharacteristicValue: strings.Join(item, ","), }) } else if item, ok := v.([]int64); ok { log.Error("populateSensors: Failed to convert list of int64 into string: ", item) } else if item, ok := v.([]interface{}); ok { log.Debug("populateSensors: Got []interface {} for ", k) log.Debug("populateSensors: ValueOf ", reflect.ValueOf(item)) s := SensorCharacteristic{ CharacteristicName: k, } var buf bytes.Buffer fmt.Fprintf(&buf, "%T", reflect.ValueOf(item)) s.CharacteristicValue = buf.String() sensor.SensorCharacteristicList = append(sensor.SensorCharacteristicList, s) } else { log.Error("populateSensors: Failed to process ", k) } } } // End of 'for' statement log.Info("populateSensors: sensor: ", sensor) sensorsMap[sensor.SensorIdentifier] = sensor sensorsPerPlatformMap[sensor.IotPlatformId] = append(sensorsPerPlatformMap[sensor.IotPlatformId], sensor.SensorIdentifier) } // End of 'for' statement // // 1. Get the list of the AE // // Build the URL // url := "http://" + iotPlatformInfo.Address + ":" + strconv.Itoa(int(iotPlatformInfo.Port)) + "/" + iotPlatformInfo.Name // log.Debug("populateSensors: url=", url) // // Build the headers // var headers = http.Header{} // headers["Accept"] = []string{headerAccept} // headers["Content-Type"] = []string{headerContentType} // headers["X-M2M-Origin"] = []string{"CAdmin"} // FIXME FSCOM How to get it // headers["X-M2M-RI"] = []string{uuid.New().String()} // headers["X-M2M-RVI"] = []string{"4"} // FIXME FSCOM How to get it // // Build the queries // queries := map[string]string{} // queries["fu"] = "1" // Filter usage // queries["ty"] = "4" // Filter on oneM2M CIN for sensors // // Send the request // response, err := sendRequest("GET", url, headers, nil, nil, queries, 200) // if err != nil { // log.Error("populateSensors: ", err.Error()) // return err // } // log.Debug("populateSensors: response: ", string(response)) // var oneM2M_uril map[string][]string // err = json.Unmarshal(response, &oneM2M_uril) // if err != nil { // log.Error("populateSensors: ", err.Error()) // return err // } // log.Debug("populateSensors: oneM2M_uril: ", len(oneM2M_uril)) // log.Debug(oneM2M_uril) // if _, ok := oneM2M_uril["m2m:uril"]; !ok { // err := errors.New("populateSensors: CharacteristicName not found: m2m:uril") // log.Error(err.Error()) // return err // } // // Loop for each CIN and build the sensor list // for _, v := range oneM2M_uril["m2m:uril"] { // log.Debug("populateSensors: Processing key: ", v) // url := "http://" + iotPlatformInfo.Address + ":" + strconv.Itoa(int(iotPlatformInfo.Port)) + "/" + v // log.Debug("populateSensors: url=", url) // // Build the headers // var headers = http.Header{} // headers["Accept"] = []string{headerAccept} // headers["Content-Type"] = []string{headerContentType} // headers["X-M2M-Origin"] = []string{"CAdmin"} // FIXME FSCOM How to get it // headers["X-M2M-RI"] = []string{uuid.New().String()} // headers["X-M2M-RVI"] = []string{"4"} // FIXME FSCOM How to get it // // Build the queries // queries := map[string]string{} // queries["fu"] = "2" // Filter usage // // Send the request // response, err := sendRequest("GET", url, headers, nil, nil, queries, 200) // if err != nil { // log.Error("populateSensors: ", err.Error()) // return err // } // log.Debug("populateSensors: response: ", string(response)) // var oneM2M_cin map[string]map[string]interface{} // err = json.Unmarshal(response, &oneM2M_cin) // if err != nil { // log.Error("populateSensors: ", err.Error()) // continue // } // log.Debug("populateSensors: type(oneM2M_cin): ", reflect.TypeOf(oneM2M_cin)) // log.Debug("populateSensors: len(oneM2M_cin): ", len(oneM2M_cin)) // log.Debug("populateSensors: oneM2M_cin: ", oneM2M_cin) // for _, m := range oneM2M_cin { // //log.Debug("==> ", i, " value is ", m) // var sensor = SensorDiscoveryInfo{ // IotPlatformId: iotPlatformInfo.IotPlatformId, // } // // m is a map[string]interface. // // loop over keys and values in the map. // for k, v := range m { // log.Debug(k, " value is ", v) // log.Debug("populateSensors: type(v): ", reflect.TypeOf(v)) // if k == "ri" { // if item, ok := v.(string); ok { // sensor.SensorIdentifier = item // } else { // log.Error("populateSensors: Failed to process ", k) // } // } else if k == "ty" { // if item, ok := v.(float64); ok { // sensor.SensorType = strconv.FormatFloat(item, 'f', -1, 64) // } else { // log.Error("populateSensors: Failed to process ", k) // } // } else { // sensor.SensorPropertyList = append(sensor.SensorPropertyList, k) // if item, ok := v.(string); ok { // sensor.SensorCharacteristicList = append( // sensor.SensorCharacteristicList, // SensorCharacteristic{ // CharacteristicName: k, // CharacteristicValue: string(item), // }) // } else if item, ok := v.(float64); ok { // sensor.SensorCharacteristicList = append( // sensor.SensorCharacteristicList, // SensorCharacteristic{ // CharacteristicName: k, // CharacteristicValue: strconv.FormatFloat(item, 'f', -1, 64), // }) // } else if item, ok := v.(int64); ok { // sensor.SensorCharacteristicList = append( // sensor.SensorCharacteristicList, // SensorCharacteristic{ // CharacteristicName: k, // CharacteristicValue: strconv.FormatInt(item, 10), // }) // } else if item, ok := v.(bool); ok { // sensor.SensorCharacteristicList = append( // sensor.SensorCharacteristicList, // SensorCharacteristic{ // CharacteristicName: k, // CharacteristicValue: strconv.FormatBool(item), // }) // } else if item, ok := v.([]string); ok { // sensor.SensorCharacteristicList = append( // sensor.SensorCharacteristicList, // SensorCharacteristic{ // CharacteristicName: k, // CharacteristicValue: strings.Join(item, ","), // }) // } else if item, ok := v.([]int64); ok { // log.Error("populateSensors: Failed to convert list of int64 into string: ", item) // } else if item, ok := v.([]interface{}); ok { // log.Debug("populateSensors: Got []interface {} for ", k) // log.Debug("populateSensors: ValueOf ", reflect.ValueOf(item)) // s := SensorCharacteristic{ // CharacteristicName: k, // } // var buf bytes.Buffer // fmt.Fprintf(&buf, "%T", reflect.ValueOf(item)) // s.CharacteristicValue = buf.String() // sensor.SensorCharacteristicList = append(sensor.SensorCharacteristicList, s) // } else { // log.Error("populateSensors: Failed to process ", k) // } // } // // if k == "rn" { // // if item, ok := v.(string); ok { // // sensor.DeviceId = item // // } else { // // log.Error("populateSensors: Failed to process ", k) // // } // // } else if k == "ri" { // // if item, ok := v.(string); ok { // // sensor.SensorIdentifier = item // // } else { // // log.Error("populateSensors: Failed to process ", k) // // } // // } else if k == "ty" { // // if item, ok := v.(float64); ok { // // sensor.SensorStatusType = strconv.FormatFloat(item, 'f', -1, 64) // // } else { // // log.Error("populateSensors: Failed to process ", k) // // } // // } else { // default: if k == "lt" || k == "et" || k == "ct" || k == "st" || k == "pi" || k == "lbl" { // // if item, ok := v.(string); ok { // // sensor.SensorCharacteristicList = append( // // sensor.SensorCharacteristicList, // // SensorCharacteristic{ // // CharacteristicName: k, // // CharacteristicValue: string(item), // // CharacteristicUnitOfMeasure: nil, // // }) // // } else if item, ok := v.(float64); ok { // // sensor.SensorCharacteristicList = append( // // sensor.SensorCharacteristicList, // // SensorCharacteristic{ // // CharacteristicName: k, // // CharacteristicValue: strconv.FormatFloat(item, 'f', -1, 64), // // CharacteristicUnitOfMeasure: nil, // // }) // // } else if item, ok := v.([]string); ok { // // sensor.SensorCharacteristicList = append( // // sensor.SensorCharacteristicList, // // SensorCharacteristic{ // // CharacteristicName: k, // // CharacteristicValue: strings.Join(item, ","), // // CharacteristicUnitOfMeasure: nil, // // }) // // } else { // // log.Error("populateSensors: Failed to process ", k) // // } // // } // } // End of 'for' loop // log.Info("populateSensors: sensor: ", sensor) // sensorsMap[sensor.SensorIdentifier] = sensor // sensorsPerPlatformMap[sensor.IotPlatformId] = append(sensorsPerPlatformMap[sensor.IotPlatformId], sensor.SensorIdentifier) // } // End of 'for' loop // } // End of 'for' statement log.Info("populateSensors: sensorsMap: ", sensorsMap) log.Info("populateSensors: sensorsPerPlatformMap: ", sensorsPerPlatformMap) if profiling { now := time.Now() log.Debug("populateSensors: ", now.Sub(profilingTimers["populateSensors"])) } return nil } func (tm *SssMgr) OneM2M_create(sensor SensorDiscoveryInfo, path string) (sensorResp SensorDiscoveryInfo, err error) { if profiling { profilingTimers["OneM2M_create"] = time.Now() } log.Info(">>> OneM2M_create: sensor=", sensor) if sensor.IotPlatformId == "" { err = errors.New("IotPlatformId fiels shall be set") log.Error("OneM2M_create: ", err.Error()) return sensorResp, err } tm.wg.Wait() log.Info("OneM2M_create: After Synchro") // Create the initial payload dictionary var bodyMap = map[string]map[string]interface{}{} // Initialize the entry if sensor.SensorType == "AE" { // FIXME FSCOM Clarify how to map Deviceinfo with oneM2M AE/CNT/fexContainer bodyMap["m2m:ae"] = make(map[string]interface{}, 0) bodyMap["m2m:ae"]["api"] = "Norg.etsi." + sensor.IotPlatformId + "." + sensor.SensorIdentifier bodyMap["m2m:ae"]["rn"] = sensor.SensorIdentifier bodyMap["m2m:ae"]["rr"] = false bodyMap["m2m:ae"]["srv"] = []string{"4"} // Add metadata if len(sensor.SensorCharacteristicList) != 0 { for _, val := range sensor.SensorCharacteristicList { log.Debug("OneM2M_create: Adding AE metadata: ", val) bodyMap["m2m:ae"][val.CharacteristicName] = val.CharacteristicValue } // End of 'for' statement } } else if sensor.SensorType == "CNT" { bodyMap["m2m:cnt"] = make(map[string]interface{}, 0) bodyMap["m2m:cnt"]["mbs"] = 10000 bodyMap["m2m:cnt"]["mni"] = 10 bodyMap["m2m:cnt"]["rn"] = sensor.SensorIdentifier // Add metadata if len(sensor.SensorCharacteristicList) != 0 { for _, val := range sensor.SensorCharacteristicList { log.Debug("OneM2M_create: Adding CNT metadata: ", val) bodyMap["m2m:cnt"][val.CharacteristicName] = val.CharacteristicValue } // End of 'for' statement } } else if sensor.SensorType == "CNI" { bodyMap["m2m:cni"] = make(map[string]interface{}, 0) bodyMap["m2m:cni"]["cnf"] = "text/plain:0" bodyMap["m2m:cni"]["rn"] = sensor.SensorIdentifier // Add metadata if len(sensor.SensorCharacteristicList) != 0 { for _, val := range sensor.SensorCharacteristicList { log.Debug("OneM2M_create: Adding CNI metadata: ", val) bodyMap["m2m:cni"][val.CharacteristicName] = val.CharacteristicValue } // End of 'for' statement } } else { err = errors.New("OneM2M_create: Invalid type") log.Error("OneM2M_create: ", err.Error()) return sensorResp, err } // Send it and get the result var ctx = SssMgrBindingProtocolContext{ host: registeredIotPlatformsMap[sensor.IotPlatformId].Address, port: registeredIotPlatformsMap[sensor.IotPlatformId].Port, name: registeredIotPlatformsMap[sensor.IotPlatformId].Name, to: registeredIotPlatformsMap[sensor.IotPlatformId].Name, from: sensor.IotPlatformId, op: 1, // CREATE rqi: uuid.New().String(), rvi: []string{"4"}, // FIXME FSCOM How to get it body: bodyMap, code: 201, } if path != "" { ctx.to = path } if sensor.SensorType == "AE" { ctx.ty = 2 } else if sensor.SensorType == "CNT" { ctx.ty = 3 } else if sensor.SensorType == "CNI" { ctx.ty = 4 } else { err = errors.New("OneM2M_create: Invalid type") log.Error("send: ", err.Error()) return sensorResp, err } err, resp := protocol.send(ctx) if err != nil { log.Error("OneM2M_create: ", err.Error()) return sensorResp, err } log.Debug("OneM2M_create: resp: ", resp) log.Debug("OneM2M_create: TypeOf(resp): ", reflect.TypeOf(resp)) if _, ok := resp.(map[string]interface{}); !ok { log.Error("OneM2M_create: Interface not available") } // Add additional entries sensorResp.SensorIdentifier = sensor.SensorIdentifier sensorResp.SensorType = sensor.SensorType sensorResp.IotPlatformId = sensor.IotPlatformId sensorResp.SensorPosition = sensor.SensorPosition sensorResp, err = tm.oneM2M_deserialize(sensorResp, resp.(map[string]interface{})) if err != nil { log.Error("OneM2M_create: ", err.Error()) return sensorResp, err } log.Debug("OneM2M_create: sensorResp: ", sensorResp) if profiling { now := time.Now() log.Debug("OneM2M_create: ", now.Sub(profilingTimers["OneM2M_create"])) } return sensorResp, nil } func (tm *SssMgr) oneM2M_discovery(sensor SensorDiscoveryInfo, requestedIotPlatformId string, type_ string) (sensorResp SensorDiscoveryInfo, err error) { // FIXME FSCOM: requestedIotPlatformId should be useless // 1. Get the list of the AE // Build the URL url := "http://" + registeredIotPlatformsMap[requestedIotPlatformId].Address + ":" + strconv.Itoa(int(registeredIotPlatformsMap[requestedIotPlatformId].Port)) + "/" + registeredIotPlatformsMap[requestedIotPlatformId].Name log.Debug("oneM2M_discovery: url=", url) // Build the headers var headers = http.Header{} headers["Accept"] = []string{headerAccept} headers["Content-Type"] = []string{headerContentType} headers["X-M2M-Origin"] = []string{"CAdmin"} // FIXME FSCOM How to get it headers["X-M2M-RI"] = []string{uuid.New().String()} headers["X-M2M-RVI"] = []string{"4"} // FIXME FSCOM How to get it // Build the queries queries := map[string]string{} if type_ == "CN" { queries["fu"] = "1" // Filter usage queries["ty"] = "4" // Filter on oneM2M CIN for sensor } else { err = errors.New("oneM2M_discovery: Invalid type") log.Error("oneM2M_discovery: ", err.Error()) return sensorResp, err } // Send the request response, err := sendRequest("GET", url, headers, nil, nil, queries, 200) if err != nil { log.Error("oneM2M_discovery: ", err.Error()) return sensorResp, err } log.Debug("oneM2M_discovery: response: ", string(response)) var d map[string]map[string]interface{} err = json.Unmarshal(response, &d) if err != nil { log.Error("oneM2M_discovery: ", err.Error()) return sensorResp, err } log.Debug("oneM2M_discovery: d: ", d) // Add additional entries // sensorResp, err = tm.oneM2M_deserialize(sensor, d) // if err != nil { // log.Error("oneM2M_discovery: ", err.Error()) // return sensorResp, err // } log.Debug("oneM2M_discovery: sensorResp: ", sensorResp) return sensorResp, nil } func (tm *SssMgr) oneM2M_get(sensor SensorDiscoveryInfo, requestedIotPlatformId string, type_ string) (sensorResp SensorDiscoveryInfo, err error) { // FIXME FSCOM: requestedIotPlatformId should be useless if sensor.SensorIdentifier == "" { err = errors.New("oneM2M_get: Cannot find \"ri\" value") log.Error("oneM2M_get: ", err.Error()) return sensorResp, err } // 1. Get the list of the AE // Build the URL url := "http://" + registeredIotPlatformsMap[requestedIotPlatformId].Address + ":" + strconv.Itoa(int(registeredIotPlatformsMap[requestedIotPlatformId].Port)) + "/" + sensor.SensorIdentifier log.Debug("oneM2M_get: url=", url) // Build the headers var headers = http.Header{} headers["Accept"] = []string{headerAccept} headers["Content-Type"] = []string{headerContentType} headers["X-M2M-Origin"] = []string{"CAdmin"} // FIXME FSCOM How to get it headers["X-M2M-RI"] = []string{uuid.New().String()} headers["X-M2M-RVI"] = []string{"4"} // FIXME FSCOM How to get it // Build the queries queries := map[string]string{} if type_ == "AE" { queries = nil } else if type_ == "CN" { queries["fu"] = "1" // Filter usage queries["ty"] = "4" // Filter on oneM2M CIN for sensor } else { err = errors.New("oneM2M_get: Invalid type") log.Error("oneM2M_get: ", err.Error()) return sensorResp, err } // Send the request response, err := sendRequest("GET", url, headers, nil, nil, queries, 200) if err != nil { log.Error("oneM2M_get: ", err.Error()) return sensorResp, err } log.Debug("oneM2M_get: response: ", string(response)) var d map[string]map[string]interface{} err = json.Unmarshal(response, &d) if err != nil { log.Error("oneM2M_get: ", err.Error()) return sensorResp, err } log.Debug("oneM2M_get: d: ", d) // Add additional entries // sensorResp, err = tm.oneM2M_deserialize(sensor, d) // if err != nil { // log.Error("oneM2M_get: ", err.Error()) // return sensorResp, err // } log.Debug("oneM2M_get: sensorResp: ", sensorResp) return sensorResp, nil } func (tm *SssMgr) oneM2M_subscribe(sensor SensorDiscoveryInfo, requestedIotPlatformId string, type_ string) (subscription string, err error) { // FIXME FSCOM: requestedIotPlatformId should be useless // Build the headers var headers = http.Header{} headers["Accept"] = []string{headerAccept} headers["X-M2M-Origin"] = []string{"C" + requestedIotPlatformId} // FIXME FSCOM How to get it headers["X-M2M-RI"] = []string{uuid.New().String()} headers["X-M2M-RVI"] = []string{"4"} s := headerContentType + ";ty=23" headers["Content-Type"] = []string{s} // Build the url and the body var url string var bodyMap = map[string]map[string]interface{}{} bodyMap["m2m:sub"] = make(map[string]interface{}, 0) net := make(map[string][4]int) net["net"] = [4]int{1, 2, 3, 4} bodyMap["m2m:sub"]["enc"] = net bodyMap["m2m:sub"]["nu"] = "" // FIXME The URI of the listener bodyMap["m2m:sub"]["rn"] = sensor.SensorIdentifier url = "http://" + registeredIotPlatformsMap[requestedIotPlatformId].Address + ":" + strconv.Itoa(int(registeredIotPlatformsMap[requestedIotPlatformId].Port)) + "/" + registeredIotPlatformsMap[requestedIotPlatformId].Name log.Debug("oneM2M_subscribe: url=", url) log.Debug("oneM2M_subscribe: bodyMap=", bodyMap) body, err := json.Marshal(bodyMap) if err != nil { log.Error("oneM2M_subscribe: ", err.Error()) return "", err } log.Debug("oneM2M_subscribe: Request body: ", string(body)) // Send the request response, err := sendRequest("POST", url, headers, bytes.NewBuffer(body), nil, nil, 201) if err != nil { log.Error("oneM2M_subscribe: ", err.Error()) return "", err } log.Debug("oneM2M_subscribe: response: ", string(response)) var d map[string]map[string]interface{} err = json.Unmarshal(response, &d) if err != nil { log.Error("oneM2M_subscribe: ", err.Error()) return "", err } log.Debug("oneM2M_subscribe: d: ", d) err = errors.New("oneM2M_subscribe: To be implemented") return "", err /*nil*/ } func (tm *SssMgr) OneM2M_Delete(sensor SensorDiscoveryInfo) (err error) { // FIXME FSCOM: requestedIotPlatformId should be useless if profiling { profilingTimers["OneM2M_Delete"] = time.Now() } log.Info(">>> OneM2M_Delete: sensor=", sensor) if sensor.SensorIdentifier == "" { err = errors.New("OneM2M_Delete: Cannot find \"ri\" value") log.Error("OneM2M_Delete: ", err.Error()) return err } if sensor.IotPlatformId == "" { err = errors.New("IotPlatformId fiels shall be set") log.Error("OneM2M_Delete: ", err.Error()) return err } tm.wg.Wait() log.Info("OneM2M_Delete: After Synchro") // Send it and get the result var ctx = SssMgrBindingProtocolContext{ host: registeredIotPlatformsMap[sensor.IotPlatformId].Address, port: registeredIotPlatformsMap[sensor.IotPlatformId].Port, name: registeredIotPlatformsMap[sensor.IotPlatformId].Name, to: sensor.SensorIdentifier, from: sensor.IotPlatformId, op: 4, // DELETE ty: -1, rqi: uuid.New().String(), rvi: []string{"4"}, // FIXME FSCOM How to get it code: 200, } err, _ = protocol.send(ctx) if err != nil { log.Error("OneM2M_Delete: ", err.Error()) return err } if profiling { now := time.Now() log.Debug("OneM2M_Delete: ", now.Sub(profilingTimers["OneM2M_Delete"])) } return nil } func (tm *SssMgr) oneM2M_subscribe_discovery_event(requestedIotPlatformId string) (err error) { return nil } func (tm *SssMgr) oneM2M_deserialize(sensor SensorDiscoveryInfo, response map[string]interface{}) (sensorResp SensorDiscoveryInfo, err error) { sensorResp = sensor // Same data structure log.Debug(">>> oneM2M_deserialize: response: ", response) for i, m := range response { log.Debug("==> ", i, " value is ", m) if _, ok := m.(map[string]interface{}); !ok { // Skip it log.Warn("oneM2M_deserialize: m is not map[string]interface{}") continue } // m is a map[string]interface. // loop over keys and values in the map. for k, v := range m.(map[string]interface{}) { log.Debug(k, " value is ", v) log.Debug("oneM2M_deserialize: type(v): ", reflect.TypeOf(v)) if k == "ri" { if item, ok := v.(string); ok { sensorResp.SensorIdentifier = item } else { log.Error("oneM2M_deserialize: Failed to process ", k) } } else if k == "ty" { if item, ok := v.(float64); ok { switch item { case 2: sensorResp.SensorType = "AE" case 3: sensorResp.SensorType = "CNT" case 4: sensorResp.SensorType = "CNI" default: sensorResp.SensorType = strconv.FormatFloat(item, 'f', -1, 64) } } else { log.Error("oneM2M_deserialize: Failed to process ", k) } } else { if item, ok := v.(string); ok { sensorResp.SensorCharacteristicList = append( sensorResp.SensorCharacteristicList, SensorCharacteristic{ CharacteristicName: k, CharacteristicValue: string(item), }) } else if item, ok := v.(float64); ok { sensorResp.SensorCharacteristicList = append( sensorResp.SensorCharacteristicList, SensorCharacteristic{ CharacteristicName: k, CharacteristicValue: strconv.FormatFloat(item, 'f', -1, 64), }) } else if item, ok := v.(int64); ok { sensorResp.SensorCharacteristicList = append( sensorResp.SensorCharacteristicList, SensorCharacteristic{ CharacteristicName: k, CharacteristicValue: strconv.FormatInt(item, 10), }) } else if item, ok := v.(bool); ok { sensorResp.SensorCharacteristicList = append( sensorResp.SensorCharacteristicList, SensorCharacteristic{ CharacteristicName: k, CharacteristicValue: strconv.FormatBool(item), }) } else if item, ok := v.([]string); ok { sensorResp.SensorCharacteristicList = append( sensorResp.SensorCharacteristicList, SensorCharacteristic{ CharacteristicName: k, CharacteristicValue: strings.Join(item, ","), }) } else if item, ok := v.([]int64); ok { log.Error("oneM2M_deserialize: Failed to convert list of int64 into string: ", item) } else if item, ok := v.([]interface{}); ok { log.Debug("oneM2M_deserialize: Got []interface {} for ", k) log.Debug("oneM2M_deserialize: ValueOf ", reflect.ValueOf(item)) s := SensorCharacteristic{ CharacteristicName: k, } var buf bytes.Buffer fmt.Fprintf(&buf, "%T", item) s.CharacteristicValue = buf.String() sensorResp.SensorCharacteristicList = append(sensorResp.SensorCharacteristicList, s) } else { log.Error("oneM2M_deserialize: Failed to process: ", k) } } } // End of 'for' loop } // End of 'for' loop log.Debug("oneM2M_deserialize: sensorResp: ", sensorResp) return sensorResp, nil }