From 18cbd38c7b6aa4368295f1761d2fac4a2db7cf26 Mon Sep 17 00:00:00 2001 From: garciay <yann.garcia@fscom.fr> Date: Mon, 24 Mar 2025 16:07:24 +0100 Subject: [PATCH] Add oneM2M MQTT support --- go-apps/meep-sss/Dockerfile | 4 + go-packages/meep-sss-mgr/mqtt.go | 10 +- go-packages/meep-sss-mgr/onem2m-mgr.go | 761 +++++++++++++------- go-packages/meep-sss-mgr/onem2m-mgr_test.go | 271 +++++-- go-packages/meep-vis-traffic-mgr/mqtt.go | 6 +- 5 files changed, 716 insertions(+), 336 deletions(-) diff --git a/go-apps/meep-sss/Dockerfile b/go-apps/meep-sss/Dockerfile index b238bbe6c..d5d291efa 100644 --- a/go-apps/meep-sss/Dockerfile +++ b/go-apps/meep-sss/Dockerfile @@ -20,4 +20,8 @@ COPY ./data / RUN chmod +x /entrypoint.sh +RUN dpkg --configure -a + +EXPOSE 33122/tcp + ENTRYPOINT ["/entrypoint.sh"] diff --git a/go-packages/meep-sss-mgr/mqtt.go b/go-packages/meep-sss-mgr/mqtt.go index 9928dab14..321a3e50a 100644 --- a/go-packages/meep-sss-mgr/mqtt.go +++ b/go-packages/meep-sss-mgr/mqtt.go @@ -180,15 +180,7 @@ func (broker_mqtt *SssMgrMqtt) send(p_ctx SssMgrBindingProtocolContext) (err err d := make(map[string]int, 0) for k, v := range p_ctx.queries { if k == "ty" { // mosquitto_pub -d -q 0 -h 172.29.10.56 -p 1883 -t "/oneM2M/req/CAdmin/laboai-acme-ic-cse/json" -m "{\"fr\":\"CAdmin\",\"op\":2,\"rqi\":\"432bb877-7dc5-4e4d-b424-9c0d50604596\",\"rvi\":\"4\",\"to\":\"laboai-cse-in/YannouDomainAutomation/YannouGardenZone0\",\"ty\":3,\"fc\":{\"fu\":2,\"fo\":1}}" - //body[k] = v - i, err := strconv.Atoi(v) // body[k], err := strconv.Atoi(v) - body[k] = i // + 1 - /** - * FIXME FSCOM Based on ACME, there is a different behavior between MQTT & HTTP DISCOVERY to get the list of contaimer. - * For HTTP, ty = 3 (AE !?) - * FOR MQTT, ty = 4 (CNT) - * This is the reason of the 'body[k] = i + 1' for MQTT and not for HTTP - */ + body[k], err = strconv.Atoi(v) if err != nil { log.Error(err.Error()) return err, nil diff --git a/go-packages/meep-sss-mgr/onem2m-mgr.go b/go-packages/meep-sss-mgr/onem2m-mgr.go index ae91484e1..237b9e97d 100644 --- a/go-packages/meep-sss-mgr/onem2m-mgr.go +++ b/go-packages/meep-sss-mgr/onem2m-mgr.go @@ -21,8 +21,6 @@ import ( "encoding/json" "errors" "fmt" - "io" - "io/ioutil" "net/http" "reflect" "strconv" @@ -33,13 +31,15 @@ import ( log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger" uuid "github.com/google/uuid" - "github.com/gorilla/mux" ) // Sensors-Sharing Service Manager type SssMgr struct { name string namespace string + bindingProtocol string + host string + port int mutex sync.Mutex wg sync.WaitGroup refreshTicker *time.Ticker @@ -88,16 +88,16 @@ const profiling = false var profilingTimers map[string]time.Time const ( - headerAccept = "application/json" - headerContentType = "application/json" iot_platform_address = "lab-oai.etsi.org" iot_platform_port = 31110 iot_platform_name = "laboai-acme-ic-cse" iot_platform_id = "7feaadbb0400" ) +var protocol SssMgrBindingProtocol + // NewSssMgr - Creates and initializes a new SSS Traffic Manager -func NewSssMgr(name string, namespace string, sss_discovery_notify func(), sss_status_notify func(), sss_data_notify func()) (tm *SssMgr, err error) { +func NewSssMgr(name string, namespace string, bindingProtocol string, host string, port int, sss_discovery_notify func(), sss_status_notify func(), sss_data_notify func()) (tm *SssMgr, err error) { if name == "" { err = errors.New("Missing connector name") return nil, err @@ -112,6 +112,32 @@ func NewSssMgr(name string, namespace string, sss_discovery_notify func(), sss_s tm.namespace = "default" } + tm.bindingProtocol = bindingProtocol + tm.host = host + tm.port = port + if tm.bindingProtocol == "MQTT" { + if tm.host == "" { + err := errors.New("Host not set for MQTTP protocol") + log.Error(err.Error()) + return nil, err + } + if tm.port == 0 { + tm.port = 1883 + } + protocol = NewSssMgrMqtt() + } else if tm.bindingProtocol == "HTTP" { + protocol = NewSssMgrHttp() + } else { + err := errors.New("Binding protocol not set") + log.Error(err.Error()) + return nil, err + } + err = protocol.init(tm) + if err != nil { + log.Error(err.Error()) + return nil, err + } + tm.sss_discovery_notify = sss_discovery_notify tm.sss_status_notify = sss_status_notify tm.sss_data_notify = sss_data_notify @@ -137,12 +163,17 @@ func (tm *SssMgr) init() { sensorsMap = make(map[string]SensorDiscoveryInfo, 0) sensorsPerPlatformMap = make(map[string][]string, 0) tm.refreshTicker = nil + } // DeleteSssMgr - func (tm *SssMgr) DeleteSssMgr() (err error) { tm.stopRefreshTicker() + if protocol != nil { + protocol.uninit() + protocol = nil + } return nil } @@ -283,207 +314,341 @@ func (tm *SssMgr) populateSensors(iotPlatformInfo IotPlatformInfo) error { profilingTimers["populateSensors"] = time.Now() } - log.Info(">>> populateSensors: iotPlatformId=", iotPlatformInfo.Address) + log.Info(">>> populateSensors: iotPlatformInfo=", iotPlatformInfo) // 1. Get the list of the AE - // Build the URL - url := "http://" + iotPlatformInfo.Address + ":" + strconv.Itoa(int(iotPlatformInfo.Port)) + "/" + iotPlatformInfo.Name - log.Debug("populateSensors: url=", url) - // Build the headers - var headers = http.Header{} - headers["Accept"] = []string{headerAccept} - headers["Content-Type"] = []string{headerContentType} - headers["X-M2M-Origin"] = []string{"CAdmin"} // FIXME FSCOM How to get it - headers["X-M2M-RI"] = []string{uuid.New().String()} - headers["X-M2M-RVI"] = []string{"4"} // FIXME FSCOM How to get it + // Build the context + var ctx = SssMgrBindingProtocolContext{ + host: iotPlatformInfo.Address, + port: iotPlatformInfo.Port, + name: iotPlatformInfo.Name, + to: iotPlatformInfo.Name, + from: "Admin", // FIXME FSCOM How to get it + op: 2, // RETRIEVE + ty: -1, + rqi: uuid.New().String(), + rvi: []string{"4"}, // FIXME FSCOM How to get it + code: 200, + } // Build the queries queries := map[string]string{} queries["fu"] = "1" // Filter usage - queries["ty"] = "4" // Filter on oneM2M CIN for sensors - // Send the request - response, err := sendRequest("GET", url, headers, nil, nil, queries, 200) + queries["ty"] = "3" // Filter on oneM2M CIN for sensors + ctx.queries = queries + err, resp := protocol.send(ctx) if err != nil { - log.Error("populateSensors: ", err.Error()) - return err - } - log.Debug("populateSensors: response: ", string(response)) - - var oneM2M_uril map[string][]string - err = json.Unmarshal(response, &oneM2M_uril) - if err != nil { - log.Error("populateSensors: ", err.Error()) - return err - } - log.Debug("populateSensors: oneM2M_uril: ", len(oneM2M_uril)) - log.Debug(oneM2M_uril) - if _, ok := oneM2M_uril["m2m:uril"]; !ok { - err := errors.New("populateSensors: CharacteristicName not found: m2m:uril") - log.Error(err.Error()) + log.Error("oneM2M_create: ", err.Error()) return err } + log.Debug("populateSensors: resp: ", resp) + log.Debug("populateSensors: TypeOf(resp): ", reflect.TypeOf(resp)) + oneM2M_uril := resp.(map[string]interface{}) + log.Debug("populateSensors: oneM2M_uril: ", oneM2M_uril) + log.Debug("populateSensors: TypeOf(oneM2M_uril): ", reflect.TypeOf(oneM2M_uril)) + log.Debug("populateSensors: len(oneM2M_uril): ", len(oneM2M_uril)) // Loop for each CIN and build the sensor list - for _, v := range oneM2M_uril["m2m:uril"] { - log.Debug("populateSensors: Processing key: ", v) - - url := "http://" + iotPlatformInfo.Address + ":" + strconv.Itoa(int(iotPlatformInfo.Port)) + "/" + v - log.Debug("populateSensors: url=", url) - // Build the headers - var headers = http.Header{} - headers["Accept"] = []string{headerAccept} - headers["Content-Type"] = []string{headerContentType} - headers["X-M2M-Origin"] = []string{"CAdmin"} // FIXME FSCOM How to get it - headers["X-M2M-RI"] = []string{uuid.New().String()} - headers["X-M2M-RVI"] = []string{"4"} // FIXME FSCOM How to get it - // Build the queries - queries := map[string]string{} - queries["fu"] = "2" // Filter usage - // Send the request - response, err := sendRequest("GET", url, headers, nil, nil, queries, 200) - if err != nil { - log.Error("populateSensors: ", err.Error()) - return err + for _, v := range oneM2M_uril["m2m:uril"].([]interface{}) { + log.Debug("populateSensors: Processing key: v: ", v) + log.Debug("populateSensors: Processing key: TypeOf(v): ", reflect.TypeOf(v)) + s := v.(string) + if s == "laboai-cse-in/acpCreateACPs" || s == "laboai-cse-in/CAdmin" { + // FIXME FSCOM Bug in MQTT DISCOVERY request which does not provide the same response that HTTP DISCOVERY with the same filter criteria + continue // Discard it } - log.Debug("populateSensors: response: ", string(response)) - var oneM2M_cin map[string]map[string]interface{} - err = json.Unmarshal(response, &oneM2M_cin) + ctx.to = s + ctx.queries["fu"] = "2" + err, resp := protocol.send(ctx) if err != nil { - log.Error("populateSensors: ", err.Error()) + log.Error("oneM2M_create: ", err.Error()) + continue + } + log.Debug("populateSensors: resp: ", resp) + log.Debug("populateSensors: type(resp): ", reflect.TypeOf(resp)) + if resp.(map[string]interface{}) == nil || resp.(map[string]interface{})["m2m:cnt"] == nil { continue } + oneM2M_cin := resp.(map[string]interface{})["m2m:cnt"].(map[string]interface{}) log.Debug("populateSensors: type(oneM2M_cin): ", reflect.TypeOf(oneM2M_cin)) log.Debug("populateSensors: len(oneM2M_cin): ", len(oneM2M_cin)) log.Debug("populateSensors: oneM2M_cin: ", oneM2M_cin) - for _, m := range oneM2M_cin { - //log.Debug("==> ", i, " value is ", m) - var sensor = SensorDiscoveryInfo{ - IotPlatformId: iotPlatformInfo.IotPlatformId, - } + var sensor = SensorDiscoveryInfo{ + IotPlatformId: iotPlatformInfo.IotPlatformId, + } + for k, v := range oneM2M_cin { + log.Debug(k, " value is ", v) + log.Debug("populateSensors: type(v): ", reflect.TypeOf(v)) - // m is a map[string]interface. - // loop over keys and values in the map. - for k, v := range m { - log.Debug(k, " value is ", v) - log.Debug("populateSensors: type(v): ", reflect.TypeOf(v)) - - if k == "ri" { - if item, ok := v.(string); ok { - sensor.SensorIdentifier = item - } else { - log.Error("populateSensors: Failed to process ", k) - } - } else if k == "ty" { - if item, ok := v.(float64); ok { - sensor.SensorType = strconv.FormatFloat(item, 'f', -1, 64) - } else { - log.Error("populateSensors: Failed to process ", k) - } + if k == "ri" { + if item, ok := v.(string); ok { + sensor.SensorIdentifier = item + } else { + log.Error("populateSensors: Failed to process ", k) + } + } else if k == "ty" { + if item, ok := v.(float64); ok { + sensor.SensorType = strconv.FormatFloat(item, 'f', -1, 64) } else { - sensor.SensorPropertyList = append(sensor.SensorPropertyList, k) - if item, ok := v.(string); ok { - sensor.SensorCharacteristicList = append( - sensor.SensorCharacteristicList, - SensorCharacteristic{ - CharacteristicName: k, - CharacteristicValue: string(item), - }) - } else if item, ok := v.(float64); ok { - sensor.SensorCharacteristicList = append( - sensor.SensorCharacteristicList, - SensorCharacteristic{ - CharacteristicName: k, - CharacteristicValue: strconv.FormatFloat(item, 'f', -1, 64), - }) - } else if item, ok := v.(int64); ok { - sensor.SensorCharacteristicList = append( - sensor.SensorCharacteristicList, - SensorCharacteristic{ - CharacteristicName: k, - CharacteristicValue: strconv.FormatInt(item, 10), - }) - } else if item, ok := v.(bool); ok { - sensor.SensorCharacteristicList = append( - sensor.SensorCharacteristicList, - SensorCharacteristic{ - CharacteristicName: k, - CharacteristicValue: strconv.FormatBool(item), - }) - } else if item, ok := v.([]string); ok { - sensor.SensorCharacteristicList = append( - sensor.SensorCharacteristicList, - SensorCharacteristic{ - CharacteristicName: k, - CharacteristicValue: strings.Join(item, ","), - }) - } else if item, ok := v.([]int64); ok { - log.Error("populateSensors: Failed to convert list of int64 into string: ", item) - } else if item, ok := v.([]interface{}); ok { - log.Debug("populateSensors: Got []interface {} for ", k) - log.Debug("populateSensors: ValueOf ", reflect.ValueOf(item)) - s := SensorCharacteristic{ - CharacteristicName: k, - } - var buf bytes.Buffer - fmt.Fprintf(&buf, "%T", reflect.ValueOf(item)) - s.CharacteristicValue = buf.String() - sensor.SensorCharacteristicList = append(sensor.SensorCharacteristicList, s) - } else { - log.Error("populateSensors: Failed to process ", k) + log.Error("populateSensors: Failed to process ", k) + } + } else { + sensor.SensorPropertyList = append(sensor.SensorPropertyList, k) + if item, ok := v.(string); ok { + sensor.SensorCharacteristicList = append( + sensor.SensorCharacteristicList, + SensorCharacteristic{ + CharacteristicName: k, + CharacteristicValue: string(item), + }) + } else if item, ok := v.(float64); ok { + sensor.SensorCharacteristicList = append( + sensor.SensorCharacteristicList, + SensorCharacteristic{ + CharacteristicName: k, + CharacteristicValue: strconv.FormatFloat(item, 'f', -1, 64), + }) + } else if item, ok := v.(int64); ok { + sensor.SensorCharacteristicList = append( + sensor.SensorCharacteristicList, + SensorCharacteristic{ + CharacteristicName: k, + CharacteristicValue: strconv.FormatInt(item, 10), + }) + } else if item, ok := v.(bool); ok { + sensor.SensorCharacteristicList = append( + sensor.SensorCharacteristicList, + SensorCharacteristic{ + CharacteristicName: k, + CharacteristicValue: strconv.FormatBool(item), + }) + } else if item, ok := v.([]string); ok { + sensor.SensorCharacteristicList = append( + sensor.SensorCharacteristicList, + SensorCharacteristic{ + CharacteristicName: k, + CharacteristicValue: strings.Join(item, ","), + }) + } else if item, ok := v.([]int64); ok { + log.Error("populateSensors: Failed to convert list of int64 into string: ", item) + } else if item, ok := v.([]interface{}); ok { + log.Debug("populateSensors: Got []interface {} for ", k) + log.Debug("populateSensors: ValueOf ", reflect.ValueOf(item)) + s := SensorCharacteristic{ + CharacteristicName: k, } + var buf bytes.Buffer + fmt.Fprintf(&buf, "%T", reflect.ValueOf(item)) + s.CharacteristicValue = buf.String() + sensor.SensorCharacteristicList = append(sensor.SensorCharacteristicList, s) + } else { + log.Error("populateSensors: Failed to process ", k) } - // if k == "rn" { - // if item, ok := v.(string); ok { - // sensor.DeviceId = item - // } else { - // log.Error("populateSensors: Failed to process ", k) - // } - // } else if k == "ri" { - // if item, ok := v.(string); ok { - // sensor.SensorIdentifier = item - // } else { - // log.Error("populateSensors: Failed to process ", k) - // } - // } else if k == "ty" { - // if item, ok := v.(float64); ok { - // sensor.SensorStatusType = strconv.FormatFloat(item, 'f', -1, 64) - // } else { - // log.Error("populateSensors: Failed to process ", k) - // } - // } else { // default: if k == "lt" || k == "et" || k == "ct" || k == "st" || k == "pi" || k == "lbl" { - // if item, ok := v.(string); ok { - // sensor.SensorCharacteristicList = append( - // sensor.SensorCharacteristicList, - // SensorCharacteristic{ - // CharacteristicName: k, - // CharacteristicValue: string(item), - // CharacteristicUnitOfMeasure: nil, - // }) - // } else if item, ok := v.(float64); ok { - // sensor.SensorCharacteristicList = append( - // sensor.SensorCharacteristicList, - // SensorCharacteristic{ - // CharacteristicName: k, - // CharacteristicValue: strconv.FormatFloat(item, 'f', -1, 64), - // CharacteristicUnitOfMeasure: nil, - // }) - // } else if item, ok := v.([]string); ok { - // sensor.SensorCharacteristicList = append( - // sensor.SensorCharacteristicList, - // SensorCharacteristic{ - // CharacteristicName: k, - // CharacteristicValue: strings.Join(item, ","), - // CharacteristicUnitOfMeasure: nil, - // }) - // } else { - // log.Error("populateSensors: Failed to process ", k) - // } - // } - } // End of 'for' loop - log.Info("populateSensors: sensor: ", sensor) - sensorsMap[sensor.SensorIdentifier] = sensor - sensorsPerPlatformMap[sensor.IotPlatformId] = append(sensorsPerPlatformMap[sensor.IotPlatformId], sensor.SensorIdentifier) - } // End of 'for' loop + } + } // End of 'for' statement + log.Info("populateSensors: sensor: ", sensor) + sensorsMap[sensor.SensorIdentifier] = sensor + sensorsPerPlatformMap[sensor.IotPlatformId] = append(sensorsPerPlatformMap[sensor.IotPlatformId], sensor.SensorIdentifier) } // End of 'for' statement + + // // 1. Get the list of the AE + // // Build the URL + // url := "http://" + iotPlatformInfo.Address + ":" + strconv.Itoa(int(iotPlatformInfo.Port)) + "/" + iotPlatformInfo.Name + // log.Debug("populateSensors: url=", url) + // // Build the headers + // var headers = http.Header{} + // headers["Accept"] = []string{headerAccept} + // headers["Content-Type"] = []string{headerContentType} + // headers["X-M2M-Origin"] = []string{"CAdmin"} // FIXME FSCOM How to get it + // headers["X-M2M-RI"] = []string{uuid.New().String()} + // headers["X-M2M-RVI"] = []string{"4"} // FIXME FSCOM How to get it + // // Build the queries + // queries := map[string]string{} + // queries["fu"] = "1" // Filter usage + // queries["ty"] = "4" // Filter on oneM2M CIN for sensors + // // Send the request + // response, err := sendRequest("GET", url, headers, nil, nil, queries, 200) + // if err != nil { + // log.Error("populateSensors: ", err.Error()) + // return err + // } + // log.Debug("populateSensors: response: ", string(response)) + + // var oneM2M_uril map[string][]string + // err = json.Unmarshal(response, &oneM2M_uril) + // if err != nil { + // log.Error("populateSensors: ", err.Error()) + // return err + // } + // log.Debug("populateSensors: oneM2M_uril: ", len(oneM2M_uril)) + // log.Debug(oneM2M_uril) + // if _, ok := oneM2M_uril["m2m:uril"]; !ok { + // err := errors.New("populateSensors: CharacteristicName not found: m2m:uril") + // log.Error(err.Error()) + // return err + // } + // // Loop for each CIN and build the sensor list + // for _, v := range oneM2M_uril["m2m:uril"] { + // log.Debug("populateSensors: Processing key: ", v) + + // url := "http://" + iotPlatformInfo.Address + ":" + strconv.Itoa(int(iotPlatformInfo.Port)) + "/" + v + // log.Debug("populateSensors: url=", url) + // // Build the headers + // var headers = http.Header{} + // headers["Accept"] = []string{headerAccept} + // headers["Content-Type"] = []string{headerContentType} + // headers["X-M2M-Origin"] = []string{"CAdmin"} // FIXME FSCOM How to get it + // headers["X-M2M-RI"] = []string{uuid.New().String()} + // headers["X-M2M-RVI"] = []string{"4"} // FIXME FSCOM How to get it + // // Build the queries + // queries := map[string]string{} + // queries["fu"] = "2" // Filter usage + // // Send the request + // response, err := sendRequest("GET", url, headers, nil, nil, queries, 200) + // if err != nil { + // log.Error("populateSensors: ", err.Error()) + // return err + // } + // log.Debug("populateSensors: response: ", string(response)) + // var oneM2M_cin map[string]map[string]interface{} + // err = json.Unmarshal(response, &oneM2M_cin) + // if err != nil { + // log.Error("populateSensors: ", err.Error()) + // continue + // } + // log.Debug("populateSensors: type(oneM2M_cin): ", reflect.TypeOf(oneM2M_cin)) + // log.Debug("populateSensors: len(oneM2M_cin): ", len(oneM2M_cin)) + // log.Debug("populateSensors: oneM2M_cin: ", oneM2M_cin) + // for _, m := range oneM2M_cin { + // //log.Debug("==> ", i, " value is ", m) + // var sensor = SensorDiscoveryInfo{ + // IotPlatformId: iotPlatformInfo.IotPlatformId, + // } + + // // m is a map[string]interface. + // // loop over keys and values in the map. + // for k, v := range m { + // log.Debug(k, " value is ", v) + // log.Debug("populateSensors: type(v): ", reflect.TypeOf(v)) + + // if k == "ri" { + // if item, ok := v.(string); ok { + // sensor.SensorIdentifier = item + // } else { + // log.Error("populateSensors: Failed to process ", k) + // } + // } else if k == "ty" { + // if item, ok := v.(float64); ok { + // sensor.SensorType = strconv.FormatFloat(item, 'f', -1, 64) + // } else { + // log.Error("populateSensors: Failed to process ", k) + // } + // } else { + // sensor.SensorPropertyList = append(sensor.SensorPropertyList, k) + // if item, ok := v.(string); ok { + // sensor.SensorCharacteristicList = append( + // sensor.SensorCharacteristicList, + // SensorCharacteristic{ + // CharacteristicName: k, + // CharacteristicValue: string(item), + // }) + // } else if item, ok := v.(float64); ok { + // sensor.SensorCharacteristicList = append( + // sensor.SensorCharacteristicList, + // SensorCharacteristic{ + // CharacteristicName: k, + // CharacteristicValue: strconv.FormatFloat(item, 'f', -1, 64), + // }) + // } else if item, ok := v.(int64); ok { + // sensor.SensorCharacteristicList = append( + // sensor.SensorCharacteristicList, + // SensorCharacteristic{ + // CharacteristicName: k, + // CharacteristicValue: strconv.FormatInt(item, 10), + // }) + // } else if item, ok := v.(bool); ok { + // sensor.SensorCharacteristicList = append( + // sensor.SensorCharacteristicList, + // SensorCharacteristic{ + // CharacteristicName: k, + // CharacteristicValue: strconv.FormatBool(item), + // }) + // } else if item, ok := v.([]string); ok { + // sensor.SensorCharacteristicList = append( + // sensor.SensorCharacteristicList, + // SensorCharacteristic{ + // CharacteristicName: k, + // CharacteristicValue: strings.Join(item, ","), + // }) + // } else if item, ok := v.([]int64); ok { + // log.Error("populateSensors: Failed to convert list of int64 into string: ", item) + // } else if item, ok := v.([]interface{}); ok { + // log.Debug("populateSensors: Got []interface {} for ", k) + // log.Debug("populateSensors: ValueOf ", reflect.ValueOf(item)) + // s := SensorCharacteristic{ + // CharacteristicName: k, + // } + // var buf bytes.Buffer + // fmt.Fprintf(&buf, "%T", reflect.ValueOf(item)) + // s.CharacteristicValue = buf.String() + // sensor.SensorCharacteristicList = append(sensor.SensorCharacteristicList, s) + // } else { + // log.Error("populateSensors: Failed to process ", k) + // } + // } + // // if k == "rn" { + // // if item, ok := v.(string); ok { + // // sensor.DeviceId = item + // // } else { + // // log.Error("populateSensors: Failed to process ", k) + // // } + // // } else if k == "ri" { + // // if item, ok := v.(string); ok { + // // sensor.SensorIdentifier = item + // // } else { + // // log.Error("populateSensors: Failed to process ", k) + // // } + // // } else if k == "ty" { + // // if item, ok := v.(float64); ok { + // // sensor.SensorStatusType = strconv.FormatFloat(item, 'f', -1, 64) + // // } else { + // // log.Error("populateSensors: Failed to process ", k) + // // } + // // } else { // default: if k == "lt" || k == "et" || k == "ct" || k == "st" || k == "pi" || k == "lbl" { + // // if item, ok := v.(string); ok { + // // sensor.SensorCharacteristicList = append( + // // sensor.SensorCharacteristicList, + // // SensorCharacteristic{ + // // CharacteristicName: k, + // // CharacteristicValue: string(item), + // // CharacteristicUnitOfMeasure: nil, + // // }) + // // } else if item, ok := v.(float64); ok { + // // sensor.SensorCharacteristicList = append( + // // sensor.SensorCharacteristicList, + // // SensorCharacteristic{ + // // CharacteristicName: k, + // // CharacteristicValue: strconv.FormatFloat(item, 'f', -1, 64), + // // CharacteristicUnitOfMeasure: nil, + // // }) + // // } else if item, ok := v.([]string); ok { + // // sensor.SensorCharacteristicList = append( + // // sensor.SensorCharacteristicList, + // // SensorCharacteristic{ + // // CharacteristicName: k, + // // CharacteristicValue: strings.Join(item, ","), + // // CharacteristicUnitOfMeasure: nil, + // // }) + // // } else { + // // log.Error("populateSensors: Failed to process ", k) + // // } + // // } + // } // End of 'for' loop + // log.Info("populateSensors: sensor: ", sensor) + // sensorsMap[sensor.SensorIdentifier] = sensor + // sensorsPerPlatformMap[sensor.IotPlatformId] = append(sensorsPerPlatformMap[sensor.IotPlatformId], sensor.SensorIdentifier) + // } // End of 'for' loop + + // } // End of 'for' statement log.Info("populateSensors: sensorsMap: ", sensorsMap) log.Info("populateSensors: sensorsPerPlatformMap: ", sensorsPerPlatformMap) @@ -498,21 +663,7 @@ func (tm *SssMgr) populateSensors(iotPlatformInfo IotPlatformInfo) error { func (tm *SssMgr) oneM2M_create(sensor SensorDiscoveryInfo, requestedIotPlatformId string, type_ string) (sensorResp SensorDiscoveryInfo, err error) { // FIXME FSCOM: requestedIotPlatformId should be useless - // Build the headers - var headers = http.Header{} - headers["Accept"] = []string{headerAccept} - headers["X-M2M-Origin"] = []string{"C" + requestedIotPlatformId} // FIXME FSCOM How to get it - headers["X-M2M-RI"] = []string{uuid.New().String()} - headers["X-M2M-RVI"] = []string{"4"} - var s string - if type_ == "AE" { - s = headerContentType + ";ty=2" - } else if type_ == "CNT" { - s = headerContentType + ";ty=4" - } - headers["Content-Type"] = []string{s} - // Build the url and the body - var url string + // Create the initial payload dictionary var bodyMap = map[string]map[string]interface{}{} // Initialize the entry if type_ == "AE" { // FIXME FSCOM Clarify how to map Deviceinfo with oneM2M AE/CNT/fexContainer @@ -521,8 +672,6 @@ func (tm *SssMgr) oneM2M_create(sensor SensorDiscoveryInfo, requestedIotPlatform bodyMap["m2m:ae"]["rn"] = sensor.SensorIdentifier bodyMap["m2m:ae"]["rr"] = false bodyMap["m2m:ae"]["srv"] = []string{"4"} - url = "http://" + registeredIotPlatformsMap[requestedIotPlatformId].Address + ":" + strconv.Itoa(int(registeredIotPlatformsMap[requestedIotPlatformId].Port)) + "/" + registeredIotPlatformsMap[requestedIotPlatformId].Name - } else if type_ == "CNT" { bodyMap["m2m:cnt"] = make(map[string]interface{}, 0) bodyMap["m2m:cnt"]["mbs"] = 10000 @@ -536,42 +685,116 @@ func (tm *SssMgr) oneM2M_create(sensor SensorDiscoveryInfo, requestedIotPlatform // FIXME FSCOM Add metadata } // End of 'for' statement } - url = "http://" + registeredIotPlatformsMap[requestedIotPlatformId].Address + ":" + strconv.Itoa(int(registeredIotPlatformsMap[requestedIotPlatformId].Port)) + "/" + registeredIotPlatformsMap[requestedIotPlatformId].Name } else { err = errors.New("oneM2M_create: Invalid type") log.Error("oneM2M_create: ", err.Error()) return sensorResp, err } - log.Debug("oneM2M_create: url=", url) - log.Debug("oneM2M_create: bodyMap=", bodyMap) - body, err := json.Marshal(bodyMap) - if err != nil { - log.Error("oneM2M_create: ", err.Error()) - return sensorResp, err + + // Send it and get the result + var ctx = SssMgrBindingProtocolContext{ + host: registeredIotPlatformsMap[requestedIotPlatformId].Address, + port: registeredIotPlatformsMap[requestedIotPlatformId].Port, + name: registeredIotPlatformsMap[requestedIotPlatformId].Name, + to: registeredIotPlatformsMap[requestedIotPlatformId].Name, + from: requestedIotPlatformId, + op: 1, // CREATE + rqi: uuid.New().String(), + rvi: []string{"4"}, // FIXME FSCOM How to get it + body: bodyMap, + code: 201, } - log.Debug("oneM2M_create: Request body: ", string(body)) - // Send the request - response, err := sendRequest("POST", url, headers, bytes.NewBuffer(body), nil, nil, 201) - if err != nil { - log.Error("oneM2M_create: ", err.Error()) + if type_ == "AE" { + ctx.ty = 2 + } else if type_ == "CNT" { + ctx.ty = 4 + } else { + err = errors.New("oneM2M_create: Invalid type") + log.Error("send: ", err.Error()) return sensorResp, err } - log.Debug("oneM2M_create: response: ", string(response)) - var d map[string]map[string]interface{} - err = json.Unmarshal(response, &d) + + //var resp = map[string]map[string]interface{}{} + err, resp := protocol.send(ctx) if err != nil { log.Error("oneM2M_create: ", err.Error()) return sensorResp, err } - log.Debug("oneM2M_create: d: ", d) + + // Build the headers + // var headers = http.Header{} + // headers["Accept"] = []string{headerAccept} + // headers["X-M2M-Origin"] = []string{"C" + requestedIotPlatformId} // FIXME FSCOM How to get it + // headers["X-M2M-RI"] = []string{uuid.New().String()} + // headers["X-M2M-RVI"] = []string{"4"} + // var s string + // if type_ == "AE" { + // s = headerContentType + ";ty=2" + // } else if type_ == "CNT" { + // s = headerContentType + ";ty=4" + // } + // headers["Content-Type"] = []string{s} + // // Build the url and the body + // var url string + // var bodyMap = map[string]map[string]interface{}{} + // Initialize the entry + // if type_ == "AE" { // FIXME FSCOM Clarify how to map Deviceinfo with oneM2M AE/CNT/fexContainer + // bodyMap["m2m:ae"] = make(map[string]interface{}, 0) + // bodyMap["m2m:ae"]["api"] = "Norg.etsi." + requestedIotPlatformId + "." + sensor.SensorIdentifier + // bodyMap["m2m:ae"]["rn"] = sensor.SensorIdentifier + // bodyMap["m2m:ae"]["rr"] = false + // bodyMap["m2m:ae"]["srv"] = []string{"4"} + // url = "http://" + registeredIotPlatformsMap[requestedIotPlatformId].Address + ":" + strconv.Itoa(int(registeredIotPlatformsMap[requestedIotPlatformId].Port)) + "/" + registeredIotPlatformsMap[requestedIotPlatformId].Name + // } else if type_ == "CNT" { + // bodyMap["m2m:cnt"] = make(map[string]interface{}, 0) + // bodyMap["m2m:cnt"]["mbs"] = 10000 + // bodyMap["m2m:cnt"]["mni"] = 10 + // bodyMap["m2m:cnt"]["rn"] = sensor.SensorIdentifier + // bodyMap["m2m:cnt"]["srv"] = []string{"4"} + // // Add metadata + // if len(sensor.SensorCharacteristicList) != 0 { + // for _, val := range sensor.SensorCharacteristicList { + // log.Debug("oneM2M_create: Adding CNT metadata: ", val) + // // FIXME FSCOM Add metadata + // } // End of 'for' statement + // } + // url = "http://" + registeredIotPlatformsMap[requestedIotPlatformId].Address + ":" + strconv.Itoa(int(registeredIotPlatformsMap[requestedIotPlatformId].Port)) + "/" + registeredIotPlatformsMap[requestedIotPlatformId].Name + // } else { + // err = errors.New("oneM2M_create: Invalid type") + // log.Error("oneM2M_create: ", err.Error()) + // return sensorResp, err + // } + // log.Debug("oneM2M_create: url=", url) + // log.Debug("oneM2M_create: bodyMap=", bodyMap) + // body, err := json.Marshal(bodyMap) + // if err != nil { + // log.Error("oneM2M_create: ", err.Error()) + // return sensorResp, err + // } + // log.Debug("oneM2M_create: Request body: ", string(body)) + // Send the request + // response, err := sendRequest("POST", url, headers, bytes.NewBuffer(body), nil, nil, 201) + // if err != nil { + // log.Error("oneM2M_create: ", err.Error()) + // return sensorResp, err + // } + // log.Debug("oneM2M_create: response: ", string(response)) + // var d map[string]map[string]interface{} + // err = json.Unmarshal(response, &d) + // if err != nil { + // log.Error("oneM2M_create: ", err.Error()) + // return sensorResp, err + // } + log.Debug("oneM2M_create: d: ", resp) + log.Debug("oneM2M_create: TypeOf(d): ", reflect.TypeOf(resp)) // Add additional entries - sensorResp, err = tm.oneM2M_deserialize(sensorResp, d) - if err != nil { - log.Error("oneM2M_create: ", err.Error()) - return sensorResp, err - } - log.Debug("oneM2M_create: sensorResp: ", sensorResp) + // sensorResp, err = tm.oneM2M_deserialize(sensorResp, resp) + // if err != nil { + // log.Error("oneM2M_create: ", err.Error()) + // return sensorResp, err + // } + // log.Debug("oneM2M_create: sensorResp: ", sensorResp) return sensorResp, nil } @@ -683,6 +906,54 @@ func (tm *SssMgr) oneM2M_get(sensor SensorDiscoveryInfo, requestedIotPlatformId return sensorResp, nil } +func (tm *SssMgr) oneM2M_subscribe(sensor SensorDiscoveryInfo, requestedIotPlatformId string, type_ string) (subscription string, err error) { + // FIXME FSCOM: requestedIotPlatformId should be useless + + // Build the headers + var headers = http.Header{} + headers["Accept"] = []string{headerAccept} + headers["X-M2M-Origin"] = []string{"C" + requestedIotPlatformId} // FIXME FSCOM How to get it + headers["X-M2M-RI"] = []string{uuid.New().String()} + headers["X-M2M-RVI"] = []string{"4"} + s := headerContentType + ";ty=23" + headers["Content-Type"] = []string{s} + // Build the url and the body + var url string + var bodyMap = map[string]map[string]interface{}{} + bodyMap["m2m:sub"] = make(map[string]interface{}, 0) + net := make(map[string][4]int) + net["net"] = [4]int{1, 2, 3, 4} + bodyMap["m2m:sub"]["enc"] = net + bodyMap["m2m:sub"]["nu"] = "" // FIXME The URI of the listener + bodyMap["m2m:sub"]["rn"] = sensor.SensorIdentifier + url = "http://" + registeredIotPlatformsMap[requestedIotPlatformId].Address + ":" + strconv.Itoa(int(registeredIotPlatformsMap[requestedIotPlatformId].Port)) + "/" + registeredIotPlatformsMap[requestedIotPlatformId].Name + log.Debug("oneM2M_subscribe: url=", url) + log.Debug("oneM2M_subscribe: bodyMap=", bodyMap) + body, err := json.Marshal(bodyMap) + if err != nil { + log.Error("oneM2M_subscribe: ", err.Error()) + return "", err + } + log.Debug("oneM2M_subscribe: Request body: ", string(body)) + // Send the request + response, err := sendRequest("POST", url, headers, bytes.NewBuffer(body), nil, nil, 201) + if err != nil { + log.Error("oneM2M_subscribe: ", err.Error()) + return "", err + } + log.Debug("oneM2M_subscribe: response: ", string(response)) + var d map[string]map[string]interface{} + err = json.Unmarshal(response, &d) + if err != nil { + log.Error("oneM2M_subscribe: ", err.Error()) + return "", err + } + log.Debug("oneM2M_subscribe: d: ", d) + + err = errors.New("oneM2M_subscribe: To be implemented") + return "", err /*nil*/ +} + func (tm *SssMgr) oneM2M_delete(sensor SensorDiscoveryInfo, requestedIotPlatformId string, type_ string) (err error) { // FIXME FSCOM: requestedIotPlatformId should be useless @@ -809,45 +1080,3 @@ func (tm *SssMgr) oneM2M_deserialize(sensor SensorDiscoveryInfo, response map[st return sensorResp, nil } - -func sendRequest(method string, url string, headers http.Header, body io.Reader, vars map[string]string, query map[string]string, code int) ([]byte, error) { - //log.Debug(">>> sendRequest: url: ", url) - //log.Debug(">>> sendRequest: headers: ", headers) - - req, err := http.NewRequest(method, url, body) - if err != nil || req == nil { - return nil, err - } - if vars != nil { - req = mux.SetURLVars(req, vars) - } - if query != nil { - q := req.URL.Query() - for k, v := range query { - q.Add(k, v) - } - req.URL.RawQuery = q.Encode() - } - req.Header = headers - req.Close = true - - //log.Debug("sendRequest: req: ", req) - rr, err := http.DefaultClient.Do(req) - if err != nil { - return nil, err - } - - // Check the status code is what we expect. - //log.Debug("sendRequest: rr: ", rr) - if status := rr.StatusCode; status != code { - s := fmt.Sprintf("Wrong status code - got %v want %v", status, code) - return nil, errors.New(s) - } - responseData, err := ioutil.ReadAll(rr.Body) - if err != nil { - return nil, err - } - //log.Debug("sendRequest: responseData: ", responseData) - - return responseData, nil -} diff --git a/go-packages/meep-sss-mgr/onem2m-mgr_test.go b/go-packages/meep-sss-mgr/onem2m-mgr_test.go index 554ba8b26..6e60cf770 100644 --- a/go-packages/meep-sss-mgr/onem2m-mgr_test.go +++ b/go-packages/meep-sss-mgr/onem2m-mgr_test.go @@ -18,7 +18,6 @@ package sssmgr import ( "fmt" - "math/rand" "reflect" "testing" @@ -34,107 +33,202 @@ func TestNewSssMgr(t *testing.T) { // Invalid Connector fmt.Println("Invalid SSS Asset Manager") - tm, err := NewSssMgr("", tmNamespace, nil, nil, nil) + tm, err := NewSssMgr("", tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil) if err == nil || tm != nil { - t.Fatalf("DB connection should have failed") + t.Fatalf("Service name not set") + } + tm, err = NewSssMgr(tmName, tmNamespace, "", "172.29.10.56", 1883, nil, nil, nil) + if err == nil || tm != nil { + t.Fatalf("Binding protocol not set") + } + tm, err = NewSssMgr(tmName, tmNamespace, "MQTT", "", 1883, nil, nil, nil) + if err == nil || tm != nil { + t.Fatalf("Binding protocol not set") } // Valid Connector fmt.Println("Create valid SSS Asset Manager") - tm, err = NewSssMgr(tmName, tmNamespace, nil, nil, nil) + tm, err = NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil) if err != nil || tm == nil { t.Fatalf("Failed to create SSS Asset Manager") } - // Cleanup err = tm.DeleteSssMgr() if err != nil { t.Fatalf("Failed to cleanup SSS Asset Manager") } -} + tm = nil -func TestPopulateDevicesPerIotPlatforms(t *testing.T) { - fmt.Println("--- ", t.Name()) - log.MeepTextLogInit(t.Name()) - - // Valid Connector fmt.Println("Create valid SSS Asset Manager") - tm, err := NewSssMgr(tmName, tmNamespace, nil, nil, nil) + tm, err = NewSssMgr(tmName, tmNamespace, "HTTP", "172.29.10.56", 1883, nil, nil, nil) if err != nil || tm == nil { t.Fatalf("Failed to create SSS Asset Manager") } - err = tm.populateDevicesPerIotPlatforms() - if err != nil { - t.Fatalf(err.Error()) - } - // Cleanup err = tm.DeleteSssMgr() if err != nil { t.Fatalf("Failed to cleanup SSS Asset Manager") } + tm = nil } -func TestSensorDiscoveryInfoAll(t *testing.T) { - fmt.Println("--- ", t.Name()) - log.MeepTextLogInit(t.Name()) +// func TestPopulateDevicesPerIotPlatformsHttp(t *testing.T) { +// fmt.Println("--- ", t.Name()) +// log.MeepTextLogInit(t.Name()) +// // Valid Connector +// fmt.Println("Create valid SSS Asset Manager") +// tm, err := NewSssMgr(tmName, tmNamespace, "HTTP", "", 0, nil, nil, nil) +// if err != nil || tm == nil { +// t.Fatalf("Failed to create SSS Asset Manager") +// } - // Valid Connector - fmt.Println("Create valid SSS Asset Manager") - tm, err := NewSssMgr(tmName, tmNamespace, nil, nil, nil) - if err != nil || tm == nil { - t.Fatalf("Failed to create SSS Asset Manager") - } +// err = tm.populateDevicesPerIotPlatforms() +// if err != nil { +// t.Fatalf(err.Error()) +// } - err = tm.populateDevicesPerIotPlatforms() - if err != nil { - t.Fatalf(err.Error()) - } +// // Cleanup +// err = tm.DeleteSssMgr() +// if err != nil { +// t.Fatalf("Failed to cleanup SSS Asset Manager") +// } +// tm = nil +// } - sensors, err := tm.SensorDiscoveryInfoAll() - if err != nil { - t.Fatalf(err.Error()) - } - fmt.Println("sensors: ", sensors) +// func TestSensorDiscoveryInfoAllHttp(t *testing.T) { +// fmt.Println("--- ", t.Name()) +// log.MeepTextLogInit(t.Name()) +// // Valid Connector +// fmt.Println("Create valid SSS Asset Manager") +// tm, err := NewSssMgr(tmName, tmNamespace, "HTTP", "", 0, nil, nil, nil) +// if err != nil || tm == nil { +// t.Fatalf("Failed to create SSS Asset Manager") +// } - // Cleanup - err = tm.DeleteSssMgr() - if err != nil { - t.Fatalf("Failed to cleanup SSS Asset Manager") - } -} +// sensors, err := tm.SensorDiscoveryInfoAll() +// if err != nil { +// t.Fatalf(err.Error()) +// } +// fmt.Println("len=", len(sensors)) +// fmt.Println("sensors", sensors) + +// // Cleanup +// err = tm.DeleteSssMgr() +// if err != nil { +// t.Fatalf("Failed to cleanup SSS Asset Manager") +// } +// tm = nil +// } + +// func TestGetSensorHttp(t *testing.T) { +// fmt.Println("--- ", t.Name()) +// log.MeepTextLogInit(t.Name()) + +// // Valid Connector +// fmt.Println("Create valid SSS Asset Manager") +// tm, err := NewSssMgr(tmName, tmNamespace, "HTTP", "", 0, nil, nil, nil) +// if err != nil || tm == nil { +// t.Fatalf("Failed to create SSS Asset Manager") +// } + +// sensors, err := tm.SensorDiscoveryInfoAll() +// if err != nil { +// t.Fatalf(err.Error()) +// } + +// for _, v := range sensors { +// fmt.Println("v", v) +// fmt.Println("TypeOf(v)", reflect.TypeOf(v)) + +// sensor, err := tm.GetSensor(v.SensorIdentifier) +// if !validate_sensor_discovery_info(v, sensor) { +// t.Fatalf(err.Error()) +// } +// } + +// // Cleanup +// err = tm.DeleteSssMgr() +// if err != nil { +// t.Fatalf("Failed to cleanup SSS Asset Manager") +// } +// tm = nil +// } + +// func TestPopulateDevicesPerIotPlatformsMqtt(t *testing.T) { +// fmt.Println("--- ", t.Name()) +// log.MeepTextLogInit(t.Name()) +// // Valid Connector +// fmt.Println("Create valid SSS Asset Manager") +// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil) +// if err != nil || tm == nil { +// t.Fatalf("Failed to create SSS Asset Manager") +// } +// err = tm.populateDevicesPerIotPlatforms() +// if err != nil { +// t.Fatalf(err.Error()) +// } +// // Cleanup +// err = tm.DeleteSssMgr() +// if err != nil { +// t.Fatalf("Failed to cleanup SSS Asset Manager") +// } +// tm = nil +// } + +// func TestSensorDiscoveryInfoAllMqtt(t *testing.T) { +// fmt.Println("--- ", t.Name()) +// log.MeepTextLogInit(t.Name()) + +// // Valid Connector +// fmt.Println("Create valid SSS Asset Manager") +// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil) +// if err != nil || tm == nil { +// t.Fatalf("Failed to create SSS Asset Manager") +// } + +// err = tm.populateDevicesPerIotPlatforms() +// if err != nil { +// t.Fatalf(err.Error()) +// } + +// sensors, err := tm.SensorDiscoveryInfoAll() +// if err != nil { +// t.Fatalf(err.Error()) +// } +// fmt.Println("sensors: ", sensors) + +// // Cleanup +// err = tm.DeleteSssMgr() +// if err != nil { +// t.Fatalf("Failed to cleanup SSS Asset Manager") +// } +// } -func TestGetSensor(t *testing.T) { +func TestGetSensorMqtt(t *testing.T) { fmt.Println("--- ", t.Name()) log.MeepTextLogInit(t.Name()) // Valid Connector fmt.Println("Create valid SSS Asset Manager") - tm, err := NewSssMgr(tmName, tmNamespace, nil, nil, nil) + tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil) if err != nil || tm == nil { t.Fatalf("Failed to create SSS Asset Manager") } - err = tm.populateDevicesPerIotPlatforms() - if err != nil { - t.Fatalf(err.Error()) - } - sensors, err := tm.SensorDiscoveryInfoAll() if err != nil { t.Fatalf(err.Error()) } - fmt.Println("sensors: ", sensors) - idx := rand.Int31n(int32(len(sensors))) - sensor, err := tm.GetSensor(sensors[idx].SensorIdentifier) - if err != nil { - t.Fatalf(err.Error()) - } - fmt.Println("sensor: ", sensor) - if !validate_sensor_discovery_info(sensors[idx], sensor) { - t.Fatalf("Value mismatch") + for _, v := range sensors { + fmt.Println("v", v) + fmt.Println("TypeOf(v)", reflect.TypeOf(v)) + + sensor, err := tm.GetSensor(v.SensorIdentifier) + if !validate_sensor_discovery_info(v, sensor) { + t.Fatalf(err.Error()) + } } // Cleanup @@ -142,15 +236,76 @@ func TestGetSensor(t *testing.T) { if err != nil { t.Fatalf("Failed to cleanup SSS Asset Manager") } + tm = nil } +// func TestVaidateOneM2MNotificationServer(t *testing.T) { +// fmt.Println("--- ", t.Name()) +// log.MeepTextLogInit(t.Name()) + +// // Valid Connector +// fmt.Println("Create valid SSS Asset Manager") +// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, "MQTT", "172.29.10.56", 1883, nil, nil, nil) +// if err != nil || tm == nil { +// t.Fatalf("Failed to create SSS Asset Manager") +// } + +// tm.init() +// fmt.Println("Waiting for 2 minutes to do curl request: curl -v http://mec-platform.etsi.org:33122/sbxykqjr17/mep1/sens/v1 ") + +// // Cleanup +// err = tm.DeleteSssMgr() +// if err != nil { +// t.Fatalf("Failed to cleanup SSS Asset Manager") +// } +// } + +// func TestGetSensor(t *testing.T) { +// fmt.Println("--- ", t.Name()) +// log.MeepTextLogInit(t.Name()) + +// // Valid Connector +// fmt.Println("Create valid SSS Asset Manager") +// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil) +// if err != nil || tm == nil { +// t.Fatalf("Failed to create SSS Asset Manager") +// } + +// err = tm.populateDevicesPerIotPlatforms() +// if err != nil { +// t.Fatalf(err.Error()) +// } + +// sensors, err := tm.SensorDiscoveryInfoAll() +// if err != nil { +// t.Fatalf(err.Error()) +// } +// fmt.Println("sensors: ", sensors) + +// idx := rand.Int31n(int32(len(sensors))) +// sensor, err := tm.GetSensor(sensors[idx].SensorIdentifier) +// if err != nil { +// t.Fatalf(err.Error()) +// } +// fmt.Println("sensor: ", sensor) +// if !validate_sensor_discovery_info(sensors[idx], sensor) { +// t.Fatalf("Value mismatch") +// } + +// // Cleanup +// err = tm.DeleteSssMgr() +// if err != nil { +// t.Fatalf("Failed to cleanup SSS Asset Manager") +// } +// } + // func TestOneM2mCreateAEAndCNT(t *testing.T) { // fmt.Println("--- ", t.Name()) // log.MeepTextLogInit(t.Name()) // // Valid Connector // fmt.Println("Create valid SSS Asset Manager") -// tm, err := NewSssMgr(tmName, tmNamespace, nil, nil, nil) +// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil) // if err != nil || tm == nil { // t.Fatalf("Failed to create SSS Asset Manager") // } diff --git a/go-packages/meep-vis-traffic-mgr/mqtt.go b/go-packages/meep-vis-traffic-mgr/mqtt.go index 9b2af487a..07cec8eb2 100644 --- a/go-packages/meep-vis-traffic-mgr/mqtt.go +++ b/go-packages/meep-vis-traffic-mgr/mqtt.go @@ -161,9 +161,9 @@ func (broker_mqtt *message_broker_mqtt) Send(tm *TrafficMgr, msgContent string, log.Error(err.Error()) return err } - log.Info("message_broker_simu: Send: Publish content : ", content) - log.Info("message_broker_simu: Send: msgEncodeFormat: ", msgEncodeFormat) - log.Info("message_broker_simu: Send: stdOrganization: ", stdOrganization) + log.Info("message_broker_mqtt: Send: Publish content : ", content) + log.Info("message_broker_mqtt: Send: msgEncodeFormat: ", msgEncodeFormat) + log.Info("message_broker_mqtt: Send: stdOrganization: ", stdOrganization) token := broker_mqtt.client.Publish(tm.topic, 0, false, content) token.Wait() -- GitLab