diff --git a/go-apps/meep-iot/go.mod b/go-apps/meep-iot/go.mod index 17227eec90432a99a2738eebc9d9a77eecc2e549..4f6a07d164719d26c5e57c2c58a7f9f1b0c5fec9 100644 --- a/go-apps/meep-iot/go.mod +++ b/go-apps/meep-iot/go.mod @@ -16,6 +16,7 @@ require ( github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis v0.0.0 github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-sandbox-ctrl-client v0.0.0 github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-service-mgmt-client v0.0.0 + github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-sss-mgr v0.0.0 // indirect github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-subscriptions v0.0.0 // indirect github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-swagger-api-mgr v0.0.0 github.com/gorilla/handlers v1.5.1 @@ -37,6 +38,7 @@ replace ( github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-redis => ../../go-packages/meep-redis github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-sandbox-ctrl-client => ../../go-packages/meep-sandbox-ctrl-client github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-service-mgmt-client => ../../go-packages/meep-service-mgmt-client + github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-sss-mgr => ../../go-packages/meep-sss-mgr github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-subscriptions => ../../go-packages/meep-subscriptions github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-swagger-api-mgr => ../../go-packages/meep-swagger-api-mgr github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-websocket => ../../go-packages/meep-websocket diff --git a/go-apps/meep-sss/sbi/sss-sbi.go b/go-apps/meep-sss/sbi/sss-sbi.go index c6f9f28bbb1342bed5bb80514da5ffe45b429fae..420ca6d17649ab78f57270fc11a990b94560044f 100644 --- a/go-apps/meep-sss/sbi/sss-sbi.go +++ b/go-apps/meep-sss/sbi/sss-sbi.go @@ -60,6 +60,11 @@ type SbiCfg struct { RedisAddr string InfluxAddr string Locality []string + Protocol string + Host string + Port int + HostId string + Name string DiscoveryNotify func() StatusNotify func() DataNotify func() @@ -79,6 +84,11 @@ type SssSbi struct { apiMgr *sam.SwaggerApiMgr activeModel *mod.Model sssMgr *tm.SssMgr + protocol string + host string + port int + hostId string + name string discoveryNotify func() statusNotify func() dataNotify func() @@ -103,6 +113,11 @@ func Init(cfg SbiCfg) (err error) { sbi.scenarioName = "" sbi.updateScenarioNameCB = cfg.ScenarioNameCb sbi.cleanUpCB = cfg.CleanUpCb + sbi.protocol = cfg.Protocol + sbi.host = cfg.Host + sbi.port = cfg.Port + sbi.hostId = cfg.HostId + sbi.name = cfg.Name sbi.discoveryNotify = cfg.DiscoveryNotify sbi.statusNotify = cfg.StatusNotify sbi.dataNotify = cfg.DataNotify @@ -149,7 +164,7 @@ func Init(cfg SbiCfg) (err error) { } // Connect to SSS Manager - sbi.sssMgr, err = tm.NewSssMgr(sbi.moduleName, sbi.sandboxName, sbi.discoveryNotify, sbi.statusNotify, sbi.dataNotify) + sbi.sssMgr, err = tm.NewSssMgr(sbi.moduleName, sbi.sandboxName, sbi.protocol, sbi.host, sbi.port, sbi.hostId, sbi.name, sbi.discoveryNotify, sbi.statusNotify, sbi.dataNotify) if err != nil { log.Error("Failed connection to SSS Manager: ", err) return err diff --git a/go-apps/meep-sss/server/meep-sss.go b/go-apps/meep-sss/server/meep-sss.go index 00cf66b4f1fd2766f17f8f59bc1d7b3570eb9dbd..cdbaff155a4b4bda6fbc9cfd6082b0fc90125820 100644 --- a/go-apps/meep-sss/server/meep-sss.go +++ b/go-apps/meep-sss/server/meep-sss.go @@ -112,6 +112,12 @@ var mutex sync.Mutex var expiryTicker *time.Ticker var nextSubscriptionIdAvailable int +var iot_platform_address string = "lab-oai.etsi.org" +var iot_platform_port int = 31110 +var cse_name string = "laboai-acme-ic-cse" +var iot_platform_id string = "7feaadbb0400" +var iot_platform_protocol string = "HTTP" + func getAppInstanceId() (id string, err error) { var appInfo scc.ApplicationInfo appInfo.Id = instanceId @@ -384,6 +390,11 @@ func Init() (err error) { Locality: locality, ScenarioNameCb: updateStoreName, CleanUpCb: cleanUp, + Protocol: iot_platform_protocol, + Host: iot_platform_address, + Port: iot_platform_port, + Name: cse_name, + HostId: iot_platform_id, DiscoveryNotify: discoveryNotify, StatusNotify: statusNotify, DataNotify: dataNotify, diff --git a/go-packages/meep-iot-mgr/go.mod b/go-packages/meep-iot-mgr/go.mod index e53e563593ab16896af2291d45c950d96ad328dd..d9b09212a86a776b391f2f2ab8bc2a2bbe6f034f 100644 --- a/go-packages/meep-iot-mgr/go.mod +++ b/go-packages/meep-iot-mgr/go.mod @@ -14,4 +14,6 @@ require ( ) -replace github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger => ../../go-packages/meep-logger +replace ( + github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger => ../../go-packages/meep-logger +) diff --git a/go-packages/meep-iot-mgr/go.sum b/go-packages/meep-iot-mgr/go.sum index 006aacd6c4a21fb65386d78a4b64559473586a18..d1cce7262a6e12b94827ec67b4ce44f690652c70 100644 --- a/go-packages/meep-iot-mgr/go.sum +++ b/go-packages/meep-iot-mgr/go.sum @@ -1,5 +1,6 @@ github.com/BurntSushi/toml v1.2.0 h1:Rt8g24XnyGTyglgET/PRUNlrUeu9F5L+7FilkXfZgs0= github.com/BurntSushi/toml v1.2.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= +github.com/InterDigitalInc/AdvantEDGE v1.9.2 h1:CAcF+bn5m0Va2mHFL2lE4awU/kjuF6CjC05phiz8vnk= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE= github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk= diff --git a/go-packages/meep-iot-mgr/iot-mgr.go b/go-packages/meep-iot-mgr/iot-mgr.go index c025c51970351d73471dc302d0f709230968d0e0..d6c74abbe3d473b231f69bd28208d9aa6c5fb5ea 100644 --- a/go-packages/meep-iot-mgr/iot-mgr.go +++ b/go-packages/meep-iot-mgr/iot-mgr.go @@ -18,15 +18,20 @@ package iotmgr import ( "errors" + "sync" "time" log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger" + sssmgr "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-sss-mgr" ) // IOT Manager type IotMgr struct { - name string - namespace string + name string + namespace string + mutex sync.Mutex + wg sync.WaitGroup + refreshTicker *time.Ticker } type IotPlatformInfo struct { @@ -34,6 +39,7 @@ type IotPlatformInfo struct { UserTransportInfo []MbTransportInfo CustomServicesTransportInfo []TransportInfo Enabled bool + oneM2M *sssmgr.SssMgr } type MbTransportInfo struct { @@ -153,7 +159,7 @@ var devicesPerPlatformMap = map[string][]string{} // Map deviceIds pe var platformPerUserTransportIdMap = map[string][]string{} // Map userTransportId per platform // Timer to refresh devices list for all IoT platform -const refreshTickerExpeary = 10 // In seconds +const refreshTickerExpeary = 30 // In seconds // Enable profiling const profiling = false @@ -196,6 +202,7 @@ func (tm *IotMgr) init() { devicesMap = make(map[string]DeviceInfo, 0) devicesPerPlatformMap = make(map[string][]string, 0) platformPerUserTransportIdMap = make(map[string][]string, 0) + tm.refreshTicker = nil } // DeleteIotMgr - @@ -203,6 +210,46 @@ func (tm *IotMgr) DeleteIotMgr() (err error) { return nil } +func (tm *IotMgr) startRefreshTicker() { + log.Debug(">>> startRefreshTicker") + + tm.refreshTicker = time.NewTicker(time.Duration(refreshTickerExpeary) * time.Second) + go func() { + if tm.refreshTicker != nil { + for range tm.refreshTicker.C { + // Refresh the list of devices + tm.wg.Add(1) + log.Debug("startRefreshTicker: registeredIotPlatformsMap: ", registeredIotPlatformsMap) + for _, v := range registeredIotPlatformsMap { + if v.oneM2M != nil { + err := tm.populateDevicesPerIotPlatforms(v) + if err != nil { + log.Error(err) + } + } else { + log.Debug("startRefreshTicker: Nothing to do") + } + } // End of 'for' statement + log.Debug("startRefreshTicker: Before Done()") + tm.wg.Done() + log.Debug("startRefreshTicker: After Done()") + } // End of 'for' statement + log.Debug("startRefreshTicker: Leaving time loop") + } + }() +} + +func (tm *IotMgr) stopRefreshTicker() { + if tm.refreshTicker != nil { + // Refresh the list of devices + tm.wg.Add(1) + tm.refreshTicker.Stop() + tm.refreshTicker = nil + tm.wg.Done() + log.Debug("Refresh loop stopped") + } +} + func (tm *IotMgr) RegisterIotPlatformInfo(iotPlatformInfo IotPlatformInfo) (err error) { if profiling { profilingTimers["RegisterIotPlatformInfo"] = time.Now() @@ -210,7 +257,29 @@ func (tm *IotMgr) RegisterIotPlatformInfo(iotPlatformInfo IotPlatformInfo) (err log.Info(">>> RegisterIotPlatformInfo: iotPlatformId: ", iotPlatformInfo) if iotPlatformInfo.Enabled { + //{{\"iotPlatformId\": \"1a584db5-6a3e-4f56-b126-29180069ecf1\", \"userTransportInfo\": [{\"id\": \"ca22ca5e-e0ce-4da8-a2ce-2966f4759032\", \"name\": \"MQTT\", \"description\": \"MQTT\", \"type\": \"MB_TOPIC_BASED\", \"protocol\": \"MQTT\", \"version\": \"2\", \"endpoint\": {\"addresses\": [{\"host\": \"172.29.10.56\", \"port\": 1883}]}, \"security\": {}, \"implSpecificInfo\": {}}], \"customServicesTransportInfo\": [{\"id\": \"85fe5e7f-c371-4f71-b7f6-61a1f808fbb3\", \"name\": \"/laboai-acme-ic-cse\", \"description\": \"ACME oneM2M CSE\", \"type\": \"REST_HTTP\", \"protocol\": \"REST_HTTP\", \"version\": \"4\", \"endpoint\": {\"addresses\": [{\"host\": \"172.29.10.20\", \"port\": 31110}]}, \"security\": {}}], \"enabled\": true}} + iotPlatformInfo.oneM2M = nil + if len(iotPlatformInfo.CustomServicesTransportInfo) == 0 || iotPlatformInfo.CustomServicesTransportInfo[0].Endpoint == nil || len(iotPlatformInfo.CustomServicesTransportInfo[0].Endpoint.Addresses) == 0 { + log.Warn("RegisterIotPlatformInfo: Cannot use provided CustomServicesTransportInfo") + } else { + // FIXME FSCOM How to get the CSE_ID + // TODO FSCOM Add notification support? + pltf, err := sssmgr.NewSssMgr(tm.name, tm.namespace, iotPlatformInfo.CustomServicesTransportInfo[0].Protocol /*"MQTT"*/, iotPlatformInfo.CustomServicesTransportInfo[0].Endpoint.Addresses[0].Host /*"172.29.10.56"*/, int(iotPlatformInfo.CustomServicesTransportInfo[0].Endpoint.Addresses[0].Port) /*1883*/, iotPlatformInfo.IotPlatformId /*"7feaadbb0400"*/, iotPlatformInfo.CustomServicesTransportInfo[0].Name /*"laboai-acme-ic-cse"*/, nil, nil, nil) + if err != nil { + log.Error("RegisterIotPlatformInfo: ", err) + iotPlatformInfo.oneM2M = nil + } else { + log.Info("RegisterIotPlatformInfo: IoT pltf created") + iotPlatformInfo.oneM2M = pltf + if tm.refreshTicker == nil { + log.Info("RegisterIotPlatformInfo: Start RefreshTicker") + tm.startRefreshTicker() + } + } + } registeredIotPlatformsMap[iotPlatformInfo.IotPlatformId] = iotPlatformInfo + log.Info("RegisterIotPlatformInfo: iotPlatformId: ", registeredIotPlatformsMap[iotPlatformInfo.IotPlatformId]) + } // else, Skip disabled platform if profiling { @@ -233,7 +302,7 @@ func (tm *IotMgr) DeregisterIotPlatformInfo(iotPlatformId string) (err error) { delete(devicesMap, dev) } // End of 'for' statement delete(devicesPerPlatformMap, iotPlatformId) - log.Info("DeregisterIotPlatformInfo: platformPerUserTransportIdMap (before): ", iotPlatformId) + log.Info("DeregisterIotPlatformInfo: platformPerUserTransportIdMap (before): ", platformPerUserTransportIdMap) for _, rule := range platformPerUserTransportIdMap { for idx, pltf := range rule { if pltf == iotPlatformId { @@ -241,10 +310,23 @@ func (tm *IotMgr) DeregisterIotPlatformInfo(iotPlatformId string) (err error) { } } // End of 'for' statement } // End of 'for' statement - log.Info("DeregisterIotPlatformInfo: platformPerUserTransportIdMap (after): ", iotPlatformId) + log.Info("DeregisterIotPlatformInfo: platformPerUserTransportIdMap (after): ", platformPerUserTransportIdMap) } - if _, ok := registeredIotPlatformsMap[iotPlatformId]; ok { + if pltf, ok := registeredIotPlatformsMap[iotPlatformId]; ok { + if pltf.oneM2M != nil { + _ = pltf.oneM2M.DeleteSssMgr() + pltf.oneM2M = nil + log.Info("RegisterIotPlatformInfo: IoT pltf removed") + } delete(registeredIotPlatformsMap, iotPlatformId) + if len(registeredIotPlatformsMap) == 0 { + if tm.refreshTicker != nil { + log.Info("RegisterIotPlatformInfo: Stop RefreshTicker") + tm.stopRefreshTicker() + tm.refreshTicker = nil + } + + } } if profiling { @@ -261,6 +343,9 @@ func (tm *IotMgr) GetDevices() (devices []DeviceInfo, err error) { log.Info(">>> GetDevices") + tm.wg.Wait() + log.Info("GetDevices: After Wait()") + devices = make([]DeviceInfo, 0) if len(registeredIotPlatformsMap) == 0 { return devices, nil @@ -287,6 +372,9 @@ func (tm *IotMgr) GetDevice(deviceId string) (device DeviceInfo, err error) { log.Info(">>> GetDevice: deviceId: ", deviceId) + tm.wg.Wait() + log.Info("GetDevices: After Wait()") + if val, ok := devicesMap[deviceId]; !ok { err = errors.New("Wrong Device identifier") return device, err @@ -306,6 +394,9 @@ func (tm *IotMgr) GetDevice(deviceId string) (device DeviceInfo, err error) { func (tm *IotMgr) CreateDevice(device DeviceInfo) (deviceResp DeviceInfo, err error) { log.Info(">>> CreateDevice: ", device) + tm.wg.Wait() + log.Info("GetDevices: After Wait()") + // RequestedMecTrafficRule is not supported yet if len(device.RequestedMecTrafficRule) != 0 { err = errors.New("Unsupported traffic rule provided") @@ -333,6 +424,9 @@ func (tm *IotMgr) DeleteDevice(deviceId string) (err error) { log.Info(">>> DeleteDevice: device: ", deviceId) + tm.wg.Wait() + log.Info("GetDevices: After Wait()") + if _, ok := devicesMap[deviceId]; !ok { err = errors.New("Invalid device identifier") log.Error(err.Error()) @@ -378,6 +472,40 @@ func (tm *IotMgr) createDeviceWithIotPlatformId(device DeviceInfo, requestedIotP return deviceResp, err } + if registeredIotPlatformsMap[requestedIotPlatformId].oneM2M != nil && device.Enabled == true { + log.Info("createDeviceWithIotPlatformId: Create device on IoT platform", device) + var sensor = sssmgr.SensorDiscoveryInfo{ + SensorIdentifier: device.DeviceId, + SensorType: "CNT", // FIXME FSCOM How to retrieve this info + SensorPosition: nil, + IotPlatformId: requestedIotPlatformId, + } + if len(device.DeviceMetadata) != 0 { + sensor.SensorCharacteristicList = make([]sssmgr.SensorCharacteristic, len(device.DeviceMetadata)) + for i, c := range device.DeviceMetadata { + sensor.SensorCharacteristicList[i] = sssmgr.SensorCharacteristic{CharacteristicName: c.Key, CharacteristicValue: c.Value} + } // End of 'for' statement + } + // FIXME FSCOM How to manage these fields from DeviceInfo + // DeviceAuthenticationInfo string + // Gpsi string + // Pei string + // Supi string + // Msisdn string + // Imei string + // Imsi string + // Iccid string + // RequestedMecTrafficRule []TrafficRuleDescriptor + // //DeviceSpecificMessageFormats *DeviceSpecificMessageFormats + // //DownlinkInfo *DownlinkInfo + // ClientCertificate string + // } + sensor, err := registeredIotPlatformsMap[requestedIotPlatformId].oneM2M.OneM2M_create(sensor, requestedIotPlatformId, sensor.SensorType) + if err != nil { + return deviceResp, err + } + } + devicesMap[device.DeviceId] = device devicesPerPlatformMap[device.DeviceId] = append(devicesPerPlatformMap[device.DeviceId], requestedIotPlatformId) platformPerUserTransportIdMap[requestedIotPlatformId] = append(platformPerUserTransportIdMap[requestedIotPlatformId], device.RequestedUserTransportId) @@ -404,3 +532,67 @@ func (tm *IotMgr) createDeviceWithRequestedUserTransportId(device DeviceInfo, re return deviceResp, nil } + +func (tm *IotMgr) resetMaps(iotPlatformId string) { + log.Info(">>> resetMaps: ", iotPlatformId) + + // Free resources from devicesMap map + // Remove all devices for this IoT platform + log.Info("resetMaps: devicesMap (before): ", devicesMap) + for _, deviceId := range devicesPerPlatformMap[iotPlatformId] { + delete(devicesMap, deviceId) + } // End of 'for' statement + log.Info("resetMaps: devicesMap (after): ", devicesMap) + + // Remove all devices for this IoT platform + log.Info("resetMaps: devicesPerPlatformMap (before): ", devicesPerPlatformMap) + delete(devicesPerPlatformMap, iotPlatformId) + log.Info("resetMaps: devicesPerPlatformMap (after): ", devicesPerPlatformMap) + + log.Info("resetMaps: platformPerUserTransportIdMap (before): ", platformPerUserTransportIdMap) + for _, rule := range platformPerUserTransportIdMap { + for idx, pltf := range rule { + if pltf == iotPlatformId { + rule = append(rule[:idx], rule[idx+1:]...) + } + } // End of 'for' statement + } // End of 'for' statement + log.Info("resetMaps: platformPerUserTransportIdMap (after): ", platformPerUserTransportIdMap) +} + +func (tm *IotMgr) populateDevicesPerIotPlatforms(iotPlatformInfo IotPlatformInfo) (err error) { + log.Info(">>> populateDevicesPerIotPlatforms: ", iotPlatformInfo) + + if iotPlatformInfo.oneM2M == nil { + log.Info("populateDevicesPerIotPlatforms: Nothing to do") + return nil + } + + // Reset maps + tm.resetMaps(iotPlatformInfo.IotPlatformId) + + sensors, err := iotPlatformInfo.oneM2M.SensorDiscoveryInfoAll() + if err != nil { + log.Error("populateDevicesPerIotPlatforms: ", err) + return err + } + log.Info("populateDevicesPerIotPlatforms: sensors: ", sensors) + + for _, sensor := range sensors { + var deviceInfo = DeviceInfo{ + DeviceId: sensor.SensorIdentifier, + Enabled: true, + } + deviceInfo.DeviceMetadata = make([]KeyValuePair, len(sensor.SensorCharacteristicList)) + for i, c := range sensor.SensorCharacteristicList { + deviceInfo.DeviceMetadata[i] = KeyValuePair{Key: c.CharacteristicName, Value: c.CharacteristicValue} + } // End of 'for' statement + + devicesMap[deviceInfo.DeviceId] = deviceInfo + devicesPerPlatformMap[iotPlatformInfo.IotPlatformId] = append(devicesPerPlatformMap[iotPlatformInfo.IotPlatformId], deviceInfo.DeviceId) + } // End of 'for' statement + log.Info("populateDevicesPerIotPlatforms: devicesMap: ", devicesMap) + log.Info("populateDevicesPerIotPlatforms: devicesPerPlatformMap: ", devicesPerPlatformMap) + + return nil +} diff --git a/go-packages/meep-sss-mgr/http.go b/go-packages/meep-sss-mgr/http.go index 5c8d5f3828d59a3e98d6081948e5b5a5d88da1c9..cc6ee1d8822c107093dce850b8734bf29939644c 100644 --- a/go-packages/meep-sss-mgr/http.go +++ b/go-packages/meep-sss-mgr/http.go @@ -96,6 +96,7 @@ func (http_mgr *SssMgrHttp) send(p_ctx SssMgrBindingProtocolContext) (err error, } // Finalize the body log.Debug("send: url=", url) + var response []byte if p_ctx.body != nil { // With body log.Debug("send: p_ctx.body=", p_ctx.body) body, err := json.Marshal(p_ctx.body) @@ -104,26 +105,20 @@ func (http_mgr *SssMgrHttp) send(p_ctx SssMgrBindingProtocolContext) (err error, return err, nil } log.Debug("send: Request body: ", string(body)) - response, err := sendRequest(method, url, headers, bytes.NewBuffer(body), nil, p_ctx.queries, 201) + response, err = sendRequest(method, url, headers, bytes.NewBuffer(body), nil, p_ctx.queries, 201) if err != nil { log.Error("send: ", err.Error()) return err, nil } - log.Debug("send: response: ", string(response)) - err = json.Unmarshal(response, &resp) - if err != nil { - log.Error("send: ", err.Error()) - return err, nil - } - log.Debug("send: response: ", resp) - log.Debug("send: TypeOf(response): ", reflect.TypeOf(resp)) } else { // Without body - response, err := sendRequest(method, url, headers, nil, nil, p_ctx.queries, p_ctx.code) + response, err = sendRequest(method, url, headers, nil, nil, p_ctx.queries, p_ctx.code) if err != nil { log.Error("send: ", err.Error()) return err, nil } - log.Debug("send: response: ", string(response)) + } + log.Debug("send: response: ", string(response)) + if len(response) != 0 { err = json.Unmarshal(response, &resp) if err != nil { log.Error("send: ", err.Error()) diff --git a/go-packages/meep-sss-mgr/onem2m-mgr.go b/go-packages/meep-sss-mgr/onem2m-mgr.go index 237b9e97d0612b977cb0ff50223d6d6849c4c9e5..ab5127e6f6af93db8d2ffbf8df7d81ee2389fec5 100644 --- a/go-packages/meep-sss-mgr/onem2m-mgr.go +++ b/go-packages/meep-sss-mgr/onem2m-mgr.go @@ -1,5 +1,5 @@ /* - * Copyright (c) 2024 The AdvantEDGE Authors + * Copyright (c) 2024-2025 The AdvantEDGE Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -40,6 +40,8 @@ type SssMgr struct { bindingProtocol string host string port int + cse_name string + hostId string mutex sync.Mutex wg sync.WaitGroup refreshTicker *time.Ticker @@ -87,17 +89,10 @@ const profiling = false var profilingTimers map[string]time.Time -const ( - iot_platform_address = "lab-oai.etsi.org" - iot_platform_port = 31110 - iot_platform_name = "laboai-acme-ic-cse" - iot_platform_id = "7feaadbb0400" -) - var protocol SssMgrBindingProtocol // NewSssMgr - Creates and initializes a new SSS Traffic Manager -func NewSssMgr(name string, namespace string, bindingProtocol string, host string, port int, sss_discovery_notify func(), sss_status_notify func(), sss_data_notify func()) (tm *SssMgr, err error) { +func NewSssMgr(name string, namespace string, bindingProtocol string, host string, port int, hostId string, cse_name string, sss_discovery_notify func(), sss_status_notify func(), sss_data_notify func()) (tm *SssMgr, err error) { if name == "" { err = errors.New("Missing connector name") return nil, err @@ -115,6 +110,8 @@ func NewSssMgr(name string, namespace string, bindingProtocol string, host strin tm.bindingProtocol = bindingProtocol tm.host = host tm.port = port + tm.cse_name = cse_name + tm.hostId = hostId if tm.bindingProtocol == "MQTT" { if tm.host == "" { err := errors.New("Host not set for MQTTP protocol") @@ -125,13 +122,27 @@ func NewSssMgr(name string, namespace string, bindingProtocol string, host strin tm.port = 1883 } protocol = NewSssMgrMqtt() - } else if tm.bindingProtocol == "HTTP" { + } else if tm.bindingProtocol == "REST_HTTP" { + if tm.port == 0 { + tm.port = 80 + } protocol = NewSssMgrHttp() } else { err := errors.New("Binding protocol not set") log.Error(err.Error()) return nil, err } + if hostId == "" { + err := errors.New("hostId not set") + log.Error(err.Error()) + return nil, err + } + if cse_name == "" { + err := errors.New("cse_name not set") + log.Error(err.Error()) + return nil, err + } + err = protocol.init(tm) if err != nil { log.Error(err.Error()) @@ -154,11 +165,11 @@ func (tm *SssMgr) init() { } registeredIotPlatformsMap = make(map[string]IotPlatformInfo, 1) - registeredIotPlatformsMap[iot_platform_address] = IotPlatformInfo{ // FIXME FSCOM How to register IoT platform to meep-sss (see meep-iot?) - Address: iot_platform_address, - Port: iot_platform_port, - Name: iot_platform_name, - IotPlatformId: iot_platform_id, + registeredIotPlatformsMap[tm.hostId] = IotPlatformInfo{ + Address: tm.host, + Port: tm.port, + Name: tm.cse_name, + IotPlatformId: tm.hostId, } sensorsMap = make(map[string]SensorDiscoveryInfo, 0) sensorsPerPlatformMap = make(map[string][]string, 0) @@ -211,7 +222,7 @@ func (tm *SssMgr) stopRefreshTicker() { func (tm *SssMgr) SensorDiscoveryInfoAll() (sensors []SensorDiscoveryInfo, err error) { if profiling { - profilingTimers["GetDevices"] = time.Now() + profilingTimers["SensorDiscoveryInfoAll"] = time.Now() } log.Info(">>> SensorDiscoveryInfoAll") @@ -237,7 +248,7 @@ func (tm *SssMgr) SensorDiscoveryInfoAll() (sensors []SensorDiscoveryInfo, err e if profiling { now := time.Now() - log.Debug("SensorDiscoveryInfoAll: ", now.Sub(profilingTimers["GetDevices"])) + log.Debug("SensorDiscoveryInfoAll: ", now.Sub(profilingTimers["SensorDiscoveryInfoAll"])) } return sensors, nil @@ -337,7 +348,7 @@ func (tm *SssMgr) populateSensors(iotPlatformInfo IotPlatformInfo) error { ctx.queries = queries err, resp := protocol.send(ctx) if err != nil { - log.Error("oneM2M_create: ", err.Error()) + log.Error("OneM2M_create: ", err.Error()) return err } log.Debug("populateSensors: resp: ", resp) @@ -359,7 +370,7 @@ func (tm *SssMgr) populateSensors(iotPlatformInfo IotPlatformInfo) error { ctx.queries["fu"] = "2" err, resp := protocol.send(ctx) if err != nil { - log.Error("oneM2M_create: ", err.Error()) + log.Error("OneM2M_create: ", err.Error()) continue } log.Debug("populateSensors: resp: ", resp) @@ -660,141 +671,124 @@ func (tm *SssMgr) populateSensors(iotPlatformInfo IotPlatformInfo) error { return nil } -func (tm *SssMgr) oneM2M_create(sensor SensorDiscoveryInfo, requestedIotPlatformId string, type_ string) (sensorResp SensorDiscoveryInfo, err error) { - // FIXME FSCOM: requestedIotPlatformId should be useless +func (tm *SssMgr) OneM2M_create(sensor SensorDiscoveryInfo, path string) (sensorResp SensorDiscoveryInfo, err error) { + + if profiling { + profilingTimers["OneM2M_create"] = time.Now() + } + + log.Info(">>> OneM2M_create: sensor=", sensor) + + if sensor.IotPlatformId == "" { + err = errors.New("IotPlatformId fiels shall be set") + log.Error("OneM2M_create: ", err.Error()) + return sensorResp, err + } + + tm.wg.Wait() + log.Info("OneM2M_create: After Synchro") // Create the initial payload dictionary var bodyMap = map[string]map[string]interface{}{} // Initialize the entry - if type_ == "AE" { // FIXME FSCOM Clarify how to map Deviceinfo with oneM2M AE/CNT/fexContainer + if sensor.SensorType == "AE" { // FIXME FSCOM Clarify how to map Deviceinfo with oneM2M AE/CNT/fexContainer bodyMap["m2m:ae"] = make(map[string]interface{}, 0) - bodyMap["m2m:ae"]["api"] = "Norg.etsi." + requestedIotPlatformId + "." + sensor.SensorIdentifier + bodyMap["m2m:ae"]["api"] = "Norg.etsi." + sensor.IotPlatformId + "." + sensor.SensorIdentifier bodyMap["m2m:ae"]["rn"] = sensor.SensorIdentifier bodyMap["m2m:ae"]["rr"] = false bodyMap["m2m:ae"]["srv"] = []string{"4"} - } else if type_ == "CNT" { + // Add metadata + if len(sensor.SensorCharacteristicList) != 0 { + for _, val := range sensor.SensorCharacteristicList { + log.Debug("OneM2M_create: Adding AE metadata: ", val) + bodyMap["m2m:ae"][val.CharacteristicName] = val.CharacteristicValue + } // End of 'for' statement + } + } else if sensor.SensorType == "CNT" { bodyMap["m2m:cnt"] = make(map[string]interface{}, 0) bodyMap["m2m:cnt"]["mbs"] = 10000 bodyMap["m2m:cnt"]["mni"] = 10 bodyMap["m2m:cnt"]["rn"] = sensor.SensorIdentifier - bodyMap["m2m:cnt"]["srv"] = []string{"4"} // Add metadata if len(sensor.SensorCharacteristicList) != 0 { for _, val := range sensor.SensorCharacteristicList { - log.Debug("oneM2M_create: Adding CNT metadata: ", val) - // FIXME FSCOM Add metadata + log.Debug("OneM2M_create: Adding CNT metadata: ", val) + bodyMap["m2m:cnt"][val.CharacteristicName] = val.CharacteristicValue + } // End of 'for' statement + } + } else if sensor.SensorType == "CNI" { + bodyMap["m2m:cni"] = make(map[string]interface{}, 0) + bodyMap["m2m:cni"]["cnf"] = "text/plain:0" + bodyMap["m2m:cni"]["rn"] = sensor.SensorIdentifier + // Add metadata + if len(sensor.SensorCharacteristicList) != 0 { + for _, val := range sensor.SensorCharacteristicList { + log.Debug("OneM2M_create: Adding CNI metadata: ", val) + bodyMap["m2m:cni"][val.CharacteristicName] = val.CharacteristicValue } // End of 'for' statement } } else { - err = errors.New("oneM2M_create: Invalid type") - log.Error("oneM2M_create: ", err.Error()) + err = errors.New("OneM2M_create: Invalid type") + log.Error("OneM2M_create: ", err.Error()) return sensorResp, err } // Send it and get the result var ctx = SssMgrBindingProtocolContext{ - host: registeredIotPlatformsMap[requestedIotPlatformId].Address, - port: registeredIotPlatformsMap[requestedIotPlatformId].Port, - name: registeredIotPlatformsMap[requestedIotPlatformId].Name, - to: registeredIotPlatformsMap[requestedIotPlatformId].Name, - from: requestedIotPlatformId, + host: registeredIotPlatformsMap[sensor.IotPlatformId].Address, + port: registeredIotPlatformsMap[sensor.IotPlatformId].Port, + name: registeredIotPlatformsMap[sensor.IotPlatformId].Name, + to: registeredIotPlatformsMap[sensor.IotPlatformId].Name, + from: sensor.IotPlatformId, op: 1, // CREATE rqi: uuid.New().String(), rvi: []string{"4"}, // FIXME FSCOM How to get it body: bodyMap, code: 201, } - if type_ == "AE" { + if path != "" { + ctx.to = path + } + if sensor.SensorType == "AE" { ctx.ty = 2 - } else if type_ == "CNT" { + } else if sensor.SensorType == "CNT" { + ctx.ty = 3 + } else if sensor.SensorType == "CNI" { ctx.ty = 4 } else { - err = errors.New("oneM2M_create: Invalid type") + err = errors.New("OneM2M_create: Invalid type") log.Error("send: ", err.Error()) return sensorResp, err } - //var resp = map[string]map[string]interface{}{} err, resp := protocol.send(ctx) if err != nil { - log.Error("oneM2M_create: ", err.Error()) + log.Error("OneM2M_create: ", err.Error()) return sensorResp, err } + log.Debug("OneM2M_create: resp: ", resp) + log.Debug("OneM2M_create: TypeOf(resp): ", reflect.TypeOf(resp)) + if _, ok := resp.(map[string]interface{}); !ok { + log.Error("OneM2M_create: Interface not available") - // Build the headers - // var headers = http.Header{} - // headers["Accept"] = []string{headerAccept} - // headers["X-M2M-Origin"] = []string{"C" + requestedIotPlatformId} // FIXME FSCOM How to get it - // headers["X-M2M-RI"] = []string{uuid.New().String()} - // headers["X-M2M-RVI"] = []string{"4"} - // var s string - // if type_ == "AE" { - // s = headerContentType + ";ty=2" - // } else if type_ == "CNT" { - // s = headerContentType + ";ty=4" - // } - // headers["Content-Type"] = []string{s} - // // Build the url and the body - // var url string - // var bodyMap = map[string]map[string]interface{}{} - // Initialize the entry - // if type_ == "AE" { // FIXME FSCOM Clarify how to map Deviceinfo with oneM2M AE/CNT/fexContainer - // bodyMap["m2m:ae"] = make(map[string]interface{}, 0) - // bodyMap["m2m:ae"]["api"] = "Norg.etsi." + requestedIotPlatformId + "." + sensor.SensorIdentifier - // bodyMap["m2m:ae"]["rn"] = sensor.SensorIdentifier - // bodyMap["m2m:ae"]["rr"] = false - // bodyMap["m2m:ae"]["srv"] = []string{"4"} - // url = "http://" + registeredIotPlatformsMap[requestedIotPlatformId].Address + ":" + strconv.Itoa(int(registeredIotPlatformsMap[requestedIotPlatformId].Port)) + "/" + registeredIotPlatformsMap[requestedIotPlatformId].Name - // } else if type_ == "CNT" { - // bodyMap["m2m:cnt"] = make(map[string]interface{}, 0) - // bodyMap["m2m:cnt"]["mbs"] = 10000 - // bodyMap["m2m:cnt"]["mni"] = 10 - // bodyMap["m2m:cnt"]["rn"] = sensor.SensorIdentifier - // bodyMap["m2m:cnt"]["srv"] = []string{"4"} - // // Add metadata - // if len(sensor.SensorCharacteristicList) != 0 { - // for _, val := range sensor.SensorCharacteristicList { - // log.Debug("oneM2M_create: Adding CNT metadata: ", val) - // // FIXME FSCOM Add metadata - // } // End of 'for' statement - // } - // url = "http://" + registeredIotPlatformsMap[requestedIotPlatformId].Address + ":" + strconv.Itoa(int(registeredIotPlatformsMap[requestedIotPlatformId].Port)) + "/" + registeredIotPlatformsMap[requestedIotPlatformId].Name - // } else { - // err = errors.New("oneM2M_create: Invalid type") - // log.Error("oneM2M_create: ", err.Error()) - // return sensorResp, err - // } - // log.Debug("oneM2M_create: url=", url) - // log.Debug("oneM2M_create: bodyMap=", bodyMap) - // body, err := json.Marshal(bodyMap) - // if err != nil { - // log.Error("oneM2M_create: ", err.Error()) - // return sensorResp, err - // } - // log.Debug("oneM2M_create: Request body: ", string(body)) - // Send the request - // response, err := sendRequest("POST", url, headers, bytes.NewBuffer(body), nil, nil, 201) - // if err != nil { - // log.Error("oneM2M_create: ", err.Error()) - // return sensorResp, err - // } - // log.Debug("oneM2M_create: response: ", string(response)) - // var d map[string]map[string]interface{} - // err = json.Unmarshal(response, &d) - // if err != nil { - // log.Error("oneM2M_create: ", err.Error()) - // return sensorResp, err - // } - log.Debug("oneM2M_create: d: ", resp) - log.Debug("oneM2M_create: TypeOf(d): ", reflect.TypeOf(resp)) + } // Add additional entries - // sensorResp, err = tm.oneM2M_deserialize(sensorResp, resp) - // if err != nil { - // log.Error("oneM2M_create: ", err.Error()) - // return sensorResp, err - // } - // log.Debug("oneM2M_create: sensorResp: ", sensorResp) + sensorResp.SensorIdentifier = sensor.SensorIdentifier + sensorResp.SensorType = sensor.SensorType + sensorResp.IotPlatformId = sensor.IotPlatformId + sensorResp.SensorPosition = sensor.SensorPosition + sensorResp, err = tm.oneM2M_deserialize(sensorResp, resp.(map[string]interface{})) + if err != nil { + log.Error("OneM2M_create: ", err.Error()) + return sensorResp, err + } + log.Debug("OneM2M_create: sensorResp: ", sensorResp) + + if profiling { + now := time.Now() + log.Debug("OneM2M_create: ", now.Sub(profilingTimers["OneM2M_create"])) + } return sensorResp, nil } @@ -839,12 +833,13 @@ func (tm *SssMgr) oneM2M_discovery(sensor SensorDiscoveryInfo, requestedIotPlatf } log.Debug("oneM2M_discovery: d: ", d) // Add additional entries - sensorResp, err = tm.oneM2M_deserialize(sensor, d) - if err != nil { - log.Error("oneM2M_discovery: ", err.Error()) - return sensorResp, err - } - // log.Debug("oneM2M_discovery: sensorResp: ", sensorResp) + // sensorResp, err = tm.oneM2M_deserialize(sensor, d) + // if err != nil { + // log.Error("oneM2M_discovery: ", err.Error()) + // return sensorResp, err + // } + log.Debug("oneM2M_discovery: sensorResp: ", sensorResp) + return sensorResp, nil } @@ -897,11 +892,11 @@ func (tm *SssMgr) oneM2M_get(sensor SensorDiscoveryInfo, requestedIotPlatformId log.Debug("oneM2M_get: d: ", d) // Add additional entries - sensorResp, err = tm.oneM2M_deserialize(sensor, d) - if err != nil { - log.Error("oneM2M_get: ", err.Error()) - return sensorResp, err - } + // sensorResp, err = tm.oneM2M_deserialize(sensor, d) + // if err != nil { + // log.Error("oneM2M_get: ", err.Error()) + // return sensorResp, err + // } log.Debug("oneM2M_get: sensorResp: ", sensorResp) return sensorResp, nil } @@ -954,33 +949,54 @@ func (tm *SssMgr) oneM2M_subscribe(sensor SensorDiscoveryInfo, requestedIotPlatf return "", err /*nil*/ } -func (tm *SssMgr) oneM2M_delete(sensor SensorDiscoveryInfo, requestedIotPlatformId string, type_ string) (err error) { +func (tm *SssMgr) OneM2M_Delete(sensor SensorDiscoveryInfo) (err error) { // FIXME FSCOM: requestedIotPlatformId should be useless + if profiling { + profilingTimers["OneM2M_Delete"] = time.Now() + } + + log.Info(">>> OneM2M_Delete: sensor=", sensor) + if sensor.SensorIdentifier == "" { - err = errors.New("oneM2M_delete: Cannot find \"ri\" value") - log.Error("oneM2M_delete: ", err.Error()) + err = errors.New("OneM2M_Delete: Cannot find \"ri\" value") + log.Error("OneM2M_Delete: ", err.Error()) return err } - // Build the URL - url := "http://" + registeredIotPlatformsMap[requestedIotPlatformId].Address + ":" + strconv.Itoa(int(registeredIotPlatformsMap[requestedIotPlatformId].Port)) + "/" + sensor.SensorIdentifier - log.Debug("oneM2M_delete: url=", url) - // Build the headers - var headers = http.Header{} - headers["Accept"] = []string{headerAccept} - headers["Content-Type"] = []string{headerContentType} - headers["X-M2M-Origin"] = []string{"C" + requestedIotPlatformId} // FIXME FSCOM How to get it - headers["X-M2M-RI"] = []string{uuid.New().String()} - headers["X-M2M-RI"] = []string{uuid.New().String()} - headers["X-M2M-RVI"] = []string{"4"} - // Send the request - _, err = sendRequest("DELETE", url, headers, nil, nil, nil, 200) + if sensor.IotPlatformId == "" { + err = errors.New("IotPlatformId fiels shall be set") + log.Error("OneM2M_Delete: ", err.Error()) + return err + } + + tm.wg.Wait() + log.Info("OneM2M_Delete: After Synchro") + + // Send it and get the result + var ctx = SssMgrBindingProtocolContext{ + host: registeredIotPlatformsMap[sensor.IotPlatformId].Address, + port: registeredIotPlatformsMap[sensor.IotPlatformId].Port, + name: registeredIotPlatformsMap[sensor.IotPlatformId].Name, + to: sensor.SensorIdentifier, + from: sensor.IotPlatformId, + op: 4, // DELETE + ty: -1, + rqi: uuid.New().String(), + rvi: []string{"4"}, // FIXME FSCOM How to get it + code: 200, + } + err, _ = protocol.send(ctx) if err != nil { - log.Error("oneM2M_delete: ", err.Error()) + log.Error("OneM2M_Delete: ", err.Error()) return err } + if profiling { + now := time.Now() + log.Debug("OneM2M_Delete: ", now.Sub(profilingTimers["OneM2M_Delete"])) + } + return nil } @@ -989,38 +1005,44 @@ func (tm *SssMgr) oneM2M_subscribe_discovery_event(requestedIotPlatformId string return nil } -func (tm *SssMgr) oneM2M_deserialize(sensor SensorDiscoveryInfo, response map[string]map[string]interface{}) (sensorResp SensorDiscoveryInfo, err error) { +func (tm *SssMgr) oneM2M_deserialize(sensor SensorDiscoveryInfo, response map[string]interface{}) (sensorResp SensorDiscoveryInfo, err error) { sensorResp = sensor // Same data structure - log.Debug("oneM2M_deserialize: type(response): ", reflect.TypeOf(response)) - log.Debug("oneM2M_deserialize: len(response): ", len(response)) - log.Debug("oneM2M_deserialize: response: ", response) - - if val, ok := response["m2m:ae"]; ok { - log.Debug("oneM2M_deserialize: val: ", val) - } else { - log.Error("oneM2M_deserialize: CharacteristicName not found") - } + log.Debug(">>> oneM2M_deserialize: response: ", response) for i, m := range response { log.Debug("==> ", i, " value is ", m) + if _, ok := m.(map[string]interface{}); !ok { + // Skip it + log.Warn("oneM2M_deserialize: m is not map[string]interface{}") + continue + } // m is a map[string]interface. // loop over keys and values in the map. - for k, v := range m { + for k, v := range m.(map[string]interface{}) { log.Debug(k, " value is ", v) log.Debug("oneM2M_deserialize: type(v): ", reflect.TypeOf(v)) if k == "ri" { if item, ok := v.(string); ok { - sensor.SensorIdentifier = item + sensorResp.SensorIdentifier = item } else { - log.Error("populateSensors: Failed to process ", k) + log.Error("oneM2M_deserialize: Failed to process ", k) } } else if k == "ty" { - if item, ok := v.(string); ok { - sensor.SensorType = item + if item, ok := v.(float64); ok { + switch item { + case 2: + sensorResp.SensorType = "AE" + case 3: + sensorResp.SensorType = "CNT" + case 4: + sensorResp.SensorType = "CNI" + default: + sensorResp.SensorType = strconv.FormatFloat(item, 'f', -1, 64) + } } else { - log.Error("populateSensors: Failed to process ", k) + log.Error("oneM2M_deserialize: Failed to process ", k) } } else { if item, ok := v.(string); ok { @@ -1061,15 +1083,15 @@ func (tm *SssMgr) oneM2M_deserialize(sensor SensorDiscoveryInfo, response map[st } else if item, ok := v.([]int64); ok { log.Error("oneM2M_deserialize: Failed to convert list of int64 into string: ", item) } else if item, ok := v.([]interface{}); ok { - log.Debug("populateSensors: Got []interface {} for ", k) - log.Debug("populateSensors: ValueOf ", reflect.ValueOf(item)) + log.Debug("oneM2M_deserialize: Got []interface {} for ", k) + log.Debug("oneM2M_deserialize: ValueOf ", reflect.ValueOf(item)) s := SensorCharacteristic{ CharacteristicName: k, } var buf bytes.Buffer fmt.Fprintf(&buf, "%T", item) s.CharacteristicValue = buf.String() - sensor.SensorCharacteristicList = append(sensor.SensorCharacteristicList, s) + sensorResp.SensorCharacteristicList = append(sensorResp.SensorCharacteristicList, s) } else { log.Error("oneM2M_deserialize: Failed to process: ", k) } @@ -1077,6 +1099,7 @@ func (tm *SssMgr) oneM2M_deserialize(sensor SensorDiscoveryInfo, response map[st } // End of 'for' loop } // End of 'for' loop + log.Debug("oneM2M_deserialize: sensorResp: ", sensorResp) return sensorResp, nil } diff --git a/go-packages/meep-sss-mgr/onem2m-mgr_test.go b/go-packages/meep-sss-mgr/onem2m-mgr_test.go index 6e60cf7709cc18c1a0bb4eac37e076bbfa7fa193..2bda18bb4a6ca7845d0503a80d7393d9ec3aaa73 100644 --- a/go-packages/meep-sss-mgr/onem2m-mgr_test.go +++ b/go-packages/meep-sss-mgr/onem2m-mgr_test.go @@ -33,22 +33,30 @@ func TestNewSssMgr(t *testing.T) { // Invalid Connector fmt.Println("Invalid SSS Asset Manager") - tm, err := NewSssMgr("", tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil) + tm, err := NewSssMgr("", tmNamespace, "MQTT", "172.29.10.56", 1883, "7feaadbb0400", "laboai-acme-ic-cse", nil, nil, nil) if err == nil || tm != nil { t.Fatalf("Service name not set") } - tm, err = NewSssMgr(tmName, tmNamespace, "", "172.29.10.56", 1883, nil, nil, nil) + tm, err = NewSssMgr(tmName, tmNamespace, "", "172.29.10.56", 1883, "7feaadbb0400", "laboai-acme-ic-cse", nil, nil, nil) if err == nil || tm != nil { t.Fatalf("Binding protocol not set") } - tm, err = NewSssMgr(tmName, tmNamespace, "MQTT", "", 1883, nil, nil, nil) + tm, err = NewSssMgr(tmName, tmNamespace, "MQTT", "", 1883, "7feaadbb0400", "laboai-acme-ic-cse", nil, nil, nil) if err == nil || tm != nil { - t.Fatalf("Binding protocol not set") + t.Fatalf("Host not set") + } + tm, err = NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, "", "laboai-acme-ic-cse", nil, nil, nil) + if err == nil || tm != nil { + t.Fatalf("Host id not set") + } + tm, err = NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, "7feaadbb0400", "", nil, nil, nil) + if err == nil || tm != nil { + t.Fatalf("CSE name not set") } // Valid Connector fmt.Println("Create valid SSS Asset Manager") - tm, err = NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil) + tm, err = NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, "7feaadbb0400", "laboai-acme-ic-cse", nil, nil, nil) if err != nil || tm == nil { t.Fatalf("Failed to create SSS Asset Manager") } @@ -60,7 +68,7 @@ func TestNewSssMgr(t *testing.T) { tm = nil fmt.Println("Create valid SSS Asset Manager") - tm, err = NewSssMgr(tmName, tmNamespace, "HTTP", "172.29.10.56", 1883, nil, nil, nil) + tm, err = NewSssMgr(tmName, tmNamespace, "REST_HTTP", "172.29.10.56", 31110, "7feaadbb0400", "laboai-acme-ic-cse", nil, nil, nil) if err != nil || tm == nil { t.Fatalf("Failed to create SSS Asset Manager") } @@ -78,7 +86,7 @@ func TestNewSssMgr(t *testing.T) { // log.MeepTextLogInit(t.Name()) // // Valid Connector // fmt.Println("Create valid SSS Asset Manager") -// tm, err := NewSssMgr(tmName, tmNamespace, "HTTP", "", 0, nil, nil, nil) +// tm, err := NewSssMgr(tmName, tmNamespace, "REST_HTTP", "", 0, "7feaadbb0400", "laboai-acme-ic-cse", nil, nil, nil) // if err != nil || tm == nil { // t.Fatalf("Failed to create SSS Asset Manager") // } @@ -101,7 +109,7 @@ func TestNewSssMgr(t *testing.T) { // log.MeepTextLogInit(t.Name()) // // Valid Connector // fmt.Println("Create valid SSS Asset Manager") -// tm, err := NewSssMgr(tmName, tmNamespace, "HTTP", "", 0, nil, nil, nil) +// tm, err := NewSssMgr(tmName, tmNamespace, "REST_HTTP", "", 0, "7feaadbb0400", "laboai-acme-ic-cse", nil, nil, nil) // if err != nil || tm == nil { // t.Fatalf("Failed to create SSS Asset Manager") // } @@ -127,7 +135,7 @@ func TestNewSssMgr(t *testing.T) { // // Valid Connector // fmt.Println("Create valid SSS Asset Manager") -// tm, err := NewSssMgr(tmName, tmNamespace, "HTTP", "", 0, nil, nil, nil) +// tm, err := NewSssMgr(tmName, tmNamespace, "REST_HTTP", "lab-oai.etsi.org", 31110, "7feaadbb0400", "laboai-acme-ic-cse", nil, nil, nil) // if err != nil || tm == nil { // t.Fatalf("Failed to create SSS Asset Manager") // } @@ -155,80 +163,122 @@ func TestNewSssMgr(t *testing.T) { // tm = nil // } -// func TestPopulateDevicesPerIotPlatformsMqtt(t *testing.T) { +// func TestOneM2M_createAEHttp(t *testing.T) { // fmt.Println("--- ", t.Name()) // log.MeepTextLogInit(t.Name()) + // // Valid Connector // fmt.Println("Create valid SSS Asset Manager") -// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil) +// tm, err := NewSssMgr(tmName, tmNamespace, "REST_HTTP", "lab-oai.etsi.org", 31110, "7feaadbb0400", "laboai-acme-ic-cse", nil, nil, nil) // if err != nil || tm == nil { // t.Fatalf("Failed to create SSS Asset Manager") // } -// err = tm.populateDevicesPerIotPlatforms() -// if err != nil { -// t.Fatalf(err.Error()) + +// var new_sensor = SensorDiscoveryInfo{ +// SensorIdentifier: "12345", +// SensorType: "AE", +// SensorPosition: nil, +// IotPlatformId: "7feaadbb0400", // } -// // Cleanup -// err = tm.DeleteSssMgr() +// sensor, err := tm.OneM2M_create(new_sensor, "") // if err != nil { -// t.Fatalf("Failed to cleanup SSS Asset Manager") +// t.Fatalf("Failed to create new sensor") // } -// tm = nil -// } - -// func TestSensorDiscoveryInfoAllMqtt(t *testing.T) { -// fmt.Println("--- ", t.Name()) -// log.MeepTextLogInit(t.Name()) -// // Valid Connector -// fmt.Println("Create valid SSS Asset Manager") -// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil) -// if err != nil || tm == nil { -// t.Fatalf("Failed to create SSS Asset Manager") +// // Verify content +// fmt.Println(">>> validate_sensor_discovery_info_1: expected_sensor: ", new_sensor) +// fmt.Println(">>> validate_sensor_discovery_info_1: received_sensor: ", sensor) +// // if sensor.SensorIdentifier != new_sensor.SensorIdentifier { // FIXME FSCOM SensorIdentifier will be replaced by oneM2M ri +// // t.Fatalf("received_sensor.SensorIdentifier != SensorIdentifier") +// // } +// if sensor.SensorType != new_sensor.SensorType { +// t.Fatalf("received_sensor.SensorType != SensorType") // } - -// err = tm.populateDevicesPerIotPlatforms() -// if err != nil { -// t.Fatalf(err.Error()) +// if sensor.IotPlatformId != new_sensor.IotPlatformId { +// t.Fatalf("received_sensor.IotPlatformId != IotPlatformId") // } - -// sensors, err := tm.SensorDiscoveryInfoAll() -// if err != nil { -// t.Fatalf(err.Error()) +// if len(sensor.SensorCharacteristicList) == 0 { +// t.Fatalf("received_sensor.SensorCharacteristicList shall not be empty") // } -// fmt.Println("sensors: ", sensors) + +// _ = tm.OneM2M_Delete(sensor) // // Cleanup // err = tm.DeleteSssMgr() // if err != nil { // t.Fatalf("Failed to cleanup SSS Asset Manager") // } +// tm = nil // } -func TestGetSensorMqtt(t *testing.T) { +func TestOneM2M_createAE_CNTHttp(t *testing.T) { fmt.Println("--- ", t.Name()) log.MeepTextLogInit(t.Name()) // Valid Connector fmt.Println("Create valid SSS Asset Manager") - tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil) + tm, err := NewSssMgr(tmName, tmNamespace, "REST_HTTP", "lab-oai.etsi.org", 31110, "7feaadbb0400", "laboai-acme-ic-cse", nil, nil, nil) if err != nil || tm == nil { t.Fatalf("Failed to create SSS Asset Manager") } - sensors, err := tm.SensorDiscoveryInfoAll() + var sensor_ae = SensorDiscoveryInfo{ + SensorIdentifier: "CMyAE", + SensorType: "AE", + SensorPosition: nil, + IotPlatformId: "7feaadbb0400", + } + new_sensor_ae, err := tm.OneM2M_create(sensor_ae, "") if err != nil { - t.Fatalf(err.Error()) + t.Fatalf("Failed to create new AE sensor") + } + + // Verify content + fmt.Println(">>> validate_sensor_discovery_info_1: expected_sensor: ", sensor_ae) + fmt.Println(">>> validate_sensor_discovery_info_1: received_sensor: ", new_sensor_ae) + // if sensor.SensorIdentifier != new_sensor.SensorIdentifier { // FIXME FSCOM SensorIdentifier will be replaced by oneM2M ri + // t.Fatalf("received_sensor.SensorIdentifier != SensorIdentifier") + // } + if new_sensor_ae.SensorType != new_sensor_ae.SensorType { + t.Fatalf("received_sensor.SensorType != SensorType") + } + if new_sensor_ae.IotPlatformId != sensor_ae.IotPlatformId { + t.Fatalf("received_sensor.IotPlatformId != IotPlatformId") + } + if len(new_sensor_ae.SensorCharacteristicList) == 0 { + t.Fatalf("received_sensor.SensorCharacteristicList shall not be empty") } - for _, v := range sensors { - fmt.Println("v", v) - fmt.Println("TypeOf(v)", reflect.TypeOf(v)) + var sensor_cnt = SensorDiscoveryInfo{ + SensorIdentifier: "CMyCNT", + SensorType: "CNT", + SensorPosition: nil, + IotPlatformId: "7feaadbb0400", + } + // sensor_cnt.SensorCharacteristicList = make([]SensorCharacteristic, 1) + // sensor_cnt.SensorCharacteristicList[0] = SensorCharacteristic{CharacteristicName: "con", CharacteristicValue: "OFF"} + sensorPath := new_sensor_ae.SensorIdentifier + new_sensor_cnt, err := tm.OneM2M_create(sensor_cnt, sensorPath) + if err != nil { + t.Fatalf("Failed to create new sensor") + } + if new_sensor_cnt.SensorType != sensor_cnt.SensorType { + t.Fatalf("received_sensor.SensorType != SensorType") + } + if new_sensor_cnt.IotPlatformId != sensor_cnt.IotPlatformId { + t.Fatalf("received_sensor.IotPlatformId != IotPlatformId") + } + if len(new_sensor_cnt.SensorCharacteristicList) == 0 { + t.Fatalf("received_sensor.SensorCharacteristicList shall not be empty") + } - sensor, err := tm.GetSensor(v.SensorIdentifier) - if !validate_sensor_discovery_info(v, sensor) { - t.Fatalf(err.Error()) - } + err = tm.OneM2M_Delete(new_sensor_cnt) + if err != nil { + t.Fatalf("Failed to create new sensor") + } + err = tm.OneM2M_Delete(new_sensor_ae) + if err != nil { + t.Fatalf("Failed to create new sensor") } // Cleanup @@ -239,34 +289,69 @@ func TestGetSensorMqtt(t *testing.T) { tm = nil } -// func TestVaidateOneM2MNotificationServer(t *testing.T) { +// func TestOneM2M_deleteHttp(t *testing.T) { // fmt.Println("--- ", t.Name()) // log.MeepTextLogInit(t.Name()) // // Valid Connector // fmt.Println("Create valid SSS Asset Manager") -// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, "MQTT", "172.29.10.56", 1883, nil, nil, nil) +// tm, err := NewSssMgr(tmName, tmNamespace, "REST_HTTP", "lab-oai.etsi.org", 31110, "7feaadbb0400", "laboai-acme-ic-cse", nil, nil, nil) // if err != nil || tm == nil { // t.Fatalf("Failed to create SSS Asset Manager") // } -// tm.init() -// fmt.Println("Waiting for 2 minutes to do curl request: curl -v http://mec-platform.etsi.org:33122/sbxykqjr17/mep1/sens/v1 ") +// var new_sensor = SensorDiscoveryInfo{ +// SensorIdentifier: "12345", +// SensorType: "AE", +// SensorPosition: nil, +// IotPlatformId: "7feaadbb0400", +// } +// sensor, err := oneM2M_create(tm, new_sensor) +// if err != nil { +// t.Fatalf("Failed to create new sensor: ", err.Error()) +// } + +// err = tm.OneM2M_Delete(sensor) +// if err != nil { +// t.Fatalf("Failed to create new sensor: ", err.Error()) +// } // // Cleanup // err = tm.DeleteSssMgr() // if err != nil { // t.Fatalf("Failed to cleanup SSS Asset Manager") // } +// tm = nil // } -// func TestGetSensor(t *testing.T) { +// func TestPopulateDevicesPerIotPlatformsMqtt(t *testing.T) { +// fmt.Println("--- ", t.Name()) +// log.MeepTextLogInit(t.Name()) +// // Valid Connector +// fmt.Println("Create valid SSS Asset Manager") +// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, "7feaadbb0400", "laboai-acme-ic-cse", nil, nil, nil) +// if err != nil || tm == nil { +// t.Fatalf("Failed to create SSS Asset Manager") +// } +// err = tm.populateDevicesPerIotPlatforms() +// if err != nil { +// t.Fatalf(err.Error()) +// } +// // Cleanup +// err = tm.DeleteSssMgr() +// if err != nil { +// t.Fatalf("Failed to cleanup SSS Asset Manager") +// } +// tm = nil +// } + +// func TestSensorDiscoveryInfoAllMqtt(t *testing.T) { // fmt.Println("--- ", t.Name()) // log.MeepTextLogInit(t.Name()) // // Valid Connector // fmt.Println("Create valid SSS Asset Manager") -// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil) +// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, "7feaadbb0400", "laboai-acme-ic-cse", nil, nil, nil) // if err != nil || tm == nil { // t.Fatalf("Failed to create SSS Asset Manager") // } @@ -282,14 +367,37 @@ func TestGetSensorMqtt(t *testing.T) { // } // fmt.Println("sensors: ", sensors) -// idx := rand.Int31n(int32(len(sensors))) -// sensor, err := tm.GetSensor(sensors[idx].SensorIdentifier) +// // Cleanup +// err = tm.DeleteSssMgr() +// if err != nil { +// t.Fatalf("Failed to cleanup SSS Asset Manager") +// } +// } + +// func TestGetSensorMqtt(t *testing.T) { +// fmt.Println("--- ", t.Name()) +// log.MeepTextLogInit(t.Name()) + +// // Valid Connector +// fmt.Println("Create valid SSS Asset Manager") +// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, "7feaadbb0400", "laboai-acme-ic-cse", nil, nil, nil) +// if err != nil || tm == nil { +// t.Fatalf("Failed to create SSS Asset Manager") +// } + +// sensors, err := tm.SensorDiscoveryInfoAll() // if err != nil { // t.Fatalf(err.Error()) // } -// fmt.Println("sensor: ", sensor) -// if !validate_sensor_discovery_info(sensors[idx], sensor) { -// t.Fatalf("Value mismatch") + +// for _, v := range sensors { +// fmt.Println("v", v) +// fmt.Println("TypeOf(v)", reflect.TypeOf(v)) + +// sensor, err := tm.GetSensor(v.SensorIdentifier) +// if !validate_sensor_discovery_info(v, sensor) { +// t.Fatalf(err.Error()) +// } // } // // Cleanup @@ -297,55 +405,82 @@ func TestGetSensorMqtt(t *testing.T) { // if err != nil { // t.Fatalf("Failed to cleanup SSS Asset Manager") // } +// tm = nil // } -// func TestOneM2mCreateAEAndCNT(t *testing.T) { +// func TestOneM2M_createAEMQTT(t *testing.T) { // fmt.Println("--- ", t.Name()) // log.MeepTextLogInit(t.Name()) // // Valid Connector // fmt.Println("Create valid SSS Asset Manager") -// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil) +// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, "7feaadbb0400", "laboai-acme-ic-cse", nil, nil, nil) // if err != nil || tm == nil { // t.Fatalf("Failed to create SSS Asset Manager") // } -// // Register new IotPlatform and create AE entry -// fmt.Println("Register new IotPlatform and create AE entry") -// iotPlatformInfo, err := registerIotPltfAndCreateAE(tm) +// var new_sensor = SensorDiscoveryInfo{ +// SensorIdentifier: "12345", +// SensorType: "AE", +// SensorPosition: nil, +// IotPlatformId: "7feaadbb0400", +// } +// sensor, err := tm.OneM2M_create(new_sensor, "") // if err != nil { -// t.Fatalf("registerIotPltfAndCreateAE failure") +// t.Fatalf("Failed to create new sensor") +// } + +// // Verify content +// fmt.Println(">>> validate_sensor_discovery_info_1: expected_sensor: ", new_sensor) +// fmt.Println(">>> validate_sensor_discovery_info_1: received_sensor: ", sensor) +// // if sensor.SensorIdentifier != new_sensor.SensorIdentifier { // FIXME FSCOM SensorIdentifier will be replaced by oneM2M ri +// // t.Fatalf("received_sensor.SensorIdentifier != SensorIdentifier") +// // } +// if sensor.SensorType != new_sensor.SensorType { +// t.Fatalf("received_sensor.SensorType != SensorType") // } -// // Get the new IotPlatform and create AE entry -// fmt.Println("Get the new IotPlatform and create AE entry") -// deviceResp_1, err := tm.oneM2M_get(*iotPlatformInfo.DeviceInfo, iotPlatformInfo.IotPlatformId, "AE") +// if sensor.IotPlatformId != new_sensor.IotPlatformId { +// t.Fatalf("received_sensor.IotPlatformId != IotPlatformId") +// } +// if len(sensor.SensorCharacteristicList) == 0 { +// t.Fatalf("received_sensor.SensorCharacteristicList shall not be empty") +// } + +// _ = tm.OneM2M_Delete(sensor) + +// // Cleanup +// err = tm.DeleteSssMgr() // if err != nil { -// t.Fatalf("oneM2M_get failure") +// t.Fatalf("Failed to cleanup SSS Asset Manager") // } -// // Check deviceResp vs. deviceResp_1 -// fmt.Println("Check deviceResp vs. deviceResp_1") -// if !validate_device_info(*iotPlatformInfo.DeviceInfo, deviceResp_1) { -// t.Fatalf("validate_device_info failure") +// tm = nil +// } + +// func TestOneM2M_deleteMQTT(t *testing.T) { +// fmt.Println("--- ", t.Name()) +// log.MeepTextLogInit(t.Name()) + +// // Valid Connector +// fmt.Println("Create valid SSS Asset Manager") +// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, "7feaadbb0400", "laboai-acme-ic-cse", nil, nil, nil) +// if err != nil || tm == nil { +// t.Fatalf("Failed to create SSS Asset Manager") // } -// // Create a device for the IotPlatform -// var device = DeviceInfo{ -// RequestedIotPlatformId: iotPlatformInfo.IotPlatformId, -// SensorIdentifier: "Device1", -// Enabled: true, -// //DeviceMetadata: [KeyValuePair{Key: "pi", Value: *iotPlatformInfo.DeviceInfo.DeviceMetadata[]}] +// var new_sensor = SensorDiscoveryInfo{ +// SensorIdentifier: "12345", +// SensorType: "AE", +// SensorPosition: nil, +// IotPlatformId: "7feaadbb0400", // } -// device, err = tm.oneM2M_create(device, iotPlatformInfo.IotPlatformId, "CNT") +// sensor, err := oneM2M_create(tm, new_sensor, "") // if err != nil { -// t.Fatalf("oneM2M_create failed to create a device") +// t.Fatalf("Failed to create new sensor: " + err.Error()) // } -// fmt.Println("device: ", device) -// // Delete the new IotPlatform entry -// fmt.Println("Delete the new IotPlatform entry") -// err = tm.oneM2M_delete(*iotPlatformInfo.DeviceInfo, iotPlatformInfo.IotPlatformId, "AE") +// err = tm.OneM2M_Delete(sensor) // if err != nil { -// t.Fatalf("oneM2M_create failure") +// t.Fatalf("Failed to create new sensor: " + err.Error()) // } // // Cleanup @@ -353,76 +488,39 @@ func TestGetSensorMqtt(t *testing.T) { // if err != nil { // t.Fatalf("Failed to cleanup SSS Asset Manager") // } - -// // t.Fatalf("DONE") +// tm = nil // } -// func registerIotPltfAndCreateAE(tm *SssMgr) (iotPlatformInfo IotPlatformInfo, err error) { - -// // Set a valid platform -// var adresses = []Addresses{} -// adresses = append(adresses, Addresses{ -// Host: "172.29.10.56", -// Port: 1883, -// }) -// var endpoint = EndPointInfo{ -// Addresses: adresses, -// } -// var userTransportInfo = []MbTransportInfo{} -// userTransportInfo = append(userTransportInfo, MbTransportInfo{ -// Id: "d5673793-c55c-4969-b5bc-2121f84b9f8d", -// Name: "MQTT", -// Description: "MQTT", -// Protocol: "MQTT", -// Version: "2", -// Endpoint: &endpoint, -// }) -// var adresses_1 = []Addresses{} -// adresses_1 = append(adresses_1, Addresses{ -// Host: "172.29.10.20", -// Port: 31110, -// }) -// var customServicesTransportInfo = []TransportInfo{} -// var endPointInfo_1 = EndPointInfo{ -// Addresses: adresses_1, -// } -// customServicesTransportInfo = append(customServicesTransportInfo, TransportInfo{ -// Id: "2ddb713c-2b41-4ded-a7ad-a5a047c5df13", -// Name: "/laboai-acme-ic-cse", -// Description: "ACME oneM2M CSE", -// Protocol: "REST_HTTP", -// Version: "4", -// Endpoint: &endPointInfo_1, -// }) -// iotPlatformInfo = IotPlatformInfo{ -// IotPlatformId: "523f2df1-8927-429f-906c-56ba92d13762", -// UserTransportInfo: userTransportInfo, -// CustomServicesTransportInfo: customServicesTransportInfo, -// Enabled: true, -// } -// err = tm.RegisterIotPlatformInfo(iotPlatformInfo) -// if err != nil { -// return iotPlatformInfo, err -// } +// func TestVaidateOneM2MNotificationServer(t *testing.T) { +// fmt.Println("--- ", t.Name()) +// log.MeepTextLogInit(t.Name()) -// // OneM2M create MEC pltf as an AE -// requestedIotPlatformId := iotPlatformInfo.IotPlatformId -// var device = DeviceInfo{ -// RequestedIotPlatformId: requestedIotPlatformId, -// SensorIdentifier: requestedIotPlatformId, -// Enabled: true, +// // Valid Connector +// fmt.Println("Create valid SSS Asset Manager") +// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, "MQTT", "172.29.10.56", 1883, "7feaadbb0400", "laboai-acme-ic-cse", nil, nil, nil) +// if err != nil || tm == nil { +// t.Fatalf("Failed to create SSS Asset Manager") // } -// d, err := tm.oneM2M_create(device, requestedIotPlatformId, "AE") + +// tm.init() +// fmt.Println("Waiting for 2 minutes to do curl request: curl -v http://mec-platform.etsi.org:33122/sbxykqjr17/mep1/sens/v1 ") + +// // Cleanup +// err = tm.DeleteSssMgr() // if err != nil { -// return iotPlatformInfo, err +// t.Fatalf("Failed to cleanup SSS Asset Manager") // } -// iotPlatformInfo.DeviceInfo = new(DeviceInfo) -// *iotPlatformInfo.DeviceInfo = d -// fmt.Println("iotPlatformInfo.DeviceInfo: ", *iotPlatformInfo.DeviceInfo) - -// return iotPlatformInfo, nil // } +func oneM2M_create(tm *SssMgr, sensor SensorDiscoveryInfo, path string) (sensorResp SensorDiscoveryInfo, err error) { + sensorResp, err = tm.OneM2M_create(sensor, path) + if err != nil { + return sensorResp, err + } + + return sensorResp, nil +} + func validate_sensor_discovery_info(expected_sensor SensorDiscoveryInfo, received_sensor SensorDiscoveryInfo) bool { fmt.Println(">>> validate_sensor_discovery_info: expected_sensor: ", expected_sensor) fmt.Println(">>> validate_sensor_discovery_info: received_sensor: ", received_sensor)