Skip to content
Snippets Groups Projects
Commit 18cbd38c authored by Yann Garcia's avatar Yann Garcia
Browse files

Add oneM2M MQTT support

parent 648fbb1f
No related branches found
No related tags found
No related merge requests found
......@@ -20,4 +20,8 @@ COPY ./data /
RUN chmod +x /entrypoint.sh
RUN dpkg --configure -a
EXPOSE 33122/tcp
ENTRYPOINT ["/entrypoint.sh"]
......@@ -180,15 +180,7 @@ func (broker_mqtt *SssMgrMqtt) send(p_ctx SssMgrBindingProtocolContext) (err err
d := make(map[string]int, 0)
for k, v := range p_ctx.queries {
if k == "ty" { // mosquitto_pub -d -q 0 -h 172.29.10.56 -p 1883 -t "/oneM2M/req/CAdmin/laboai-acme-ic-cse/json" -m "{\"fr\":\"CAdmin\",\"op\":2,\"rqi\":\"432bb877-7dc5-4e4d-b424-9c0d50604596\",\"rvi\":\"4\",\"to\":\"laboai-cse-in/YannouDomainAutomation/YannouGardenZone0\",\"ty\":3,\"fc\":{\"fu\":2,\"fo\":1}}"
//body[k] = v
i, err := strconv.Atoi(v) // body[k], err := strconv.Atoi(v)
body[k] = i // + 1
/**
* FIXME FSCOM Based on ACME, there is a different behavior between MQTT & HTTP DISCOVERY to get the list of contaimer.
* For HTTP, ty = 3 (AE !?)
* FOR MQTT, ty = 4 (CNT)
* This is the reason of the 'body[k] = i + 1' for MQTT and not for HTTP
*/
body[k], err = strconv.Atoi(v)
if err != nil {
log.Error(err.Error())
return err, nil
......
......@@ -21,8 +21,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"reflect"
"strconv"
......@@ -33,13 +31,15 @@ import (
log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
uuid "github.com/google/uuid"
"github.com/gorilla/mux"
)
// Sensors-Sharing Service Manager
type SssMgr struct {
name string
namespace string
bindingProtocol string
host string
port int
mutex sync.Mutex
wg sync.WaitGroup
refreshTicker *time.Ticker
......@@ -88,16 +88,16 @@ const profiling = false
var profilingTimers map[string]time.Time
const (
headerAccept = "application/json"
headerContentType = "application/json"
iot_platform_address = "lab-oai.etsi.org"
iot_platform_port = 31110
iot_platform_name = "laboai-acme-ic-cse"
iot_platform_id = "7feaadbb0400"
)
var protocol SssMgrBindingProtocol
// NewSssMgr - Creates and initializes a new SSS Traffic Manager
func NewSssMgr(name string, namespace string, sss_discovery_notify func(), sss_status_notify func(), sss_data_notify func()) (tm *SssMgr, err error) {
func NewSssMgr(name string, namespace string, bindingProtocol string, host string, port int, sss_discovery_notify func(), sss_status_notify func(), sss_data_notify func()) (tm *SssMgr, err error) {
if name == "" {
err = errors.New("Missing connector name")
return nil, err
......@@ -112,6 +112,32 @@ func NewSssMgr(name string, namespace string, sss_discovery_notify func(), sss_s
tm.namespace = "default"
}
tm.bindingProtocol = bindingProtocol
tm.host = host
tm.port = port
if tm.bindingProtocol == "MQTT" {
if tm.host == "" {
err := errors.New("Host not set for MQTTP protocol")
log.Error(err.Error())
return nil, err
}
if tm.port == 0 {
tm.port = 1883
}
protocol = NewSssMgrMqtt()
} else if tm.bindingProtocol == "HTTP" {
protocol = NewSssMgrHttp()
} else {
err := errors.New("Binding protocol not set")
log.Error(err.Error())
return nil, err
}
err = protocol.init(tm)
if err != nil {
log.Error(err.Error())
return nil, err
}
tm.sss_discovery_notify = sss_discovery_notify
tm.sss_status_notify = sss_status_notify
tm.sss_data_notify = sss_data_notify
......@@ -137,12 +163,17 @@ func (tm *SssMgr) init() {
sensorsMap = make(map[string]SensorDiscoveryInfo, 0)
sensorsPerPlatformMap = make(map[string][]string, 0)
tm.refreshTicker = nil
}
// DeleteSssMgr -
func (tm *SssMgr) DeleteSssMgr() (err error) {
tm.stopRefreshTicker()
if protocol != nil {
protocol.uninit()
protocol = nil
}
return nil
}
......@@ -283,207 +314,341 @@ func (tm *SssMgr) populateSensors(iotPlatformInfo IotPlatformInfo) error {
profilingTimers["populateSensors"] = time.Now()
}
log.Info(">>> populateSensors: iotPlatformId=", iotPlatformInfo.Address)
log.Info(">>> populateSensors: iotPlatformInfo=", iotPlatformInfo)
// 1. Get the list of the AE
// Build the URL
url := "http://" + iotPlatformInfo.Address + ":" + strconv.Itoa(int(iotPlatformInfo.Port)) + "/" + iotPlatformInfo.Name
log.Debug("populateSensors: url=", url)
// Build the headers
var headers = http.Header{}
headers["Accept"] = []string{headerAccept}
headers["Content-Type"] = []string{headerContentType}
headers["X-M2M-Origin"] = []string{"CAdmin"} // FIXME FSCOM How to get it
headers["X-M2M-RI"] = []string{uuid.New().String()}
headers["X-M2M-RVI"] = []string{"4"} // FIXME FSCOM How to get it
// Build the context
var ctx = SssMgrBindingProtocolContext{
host: iotPlatformInfo.Address,
port: iotPlatformInfo.Port,
name: iotPlatformInfo.Name,
to: iotPlatformInfo.Name,
from: "Admin", // FIXME FSCOM How to get it
op: 2, // RETRIEVE
ty: -1,
rqi: uuid.New().String(),
rvi: []string{"4"}, // FIXME FSCOM How to get it
code: 200,
}
// Build the queries
queries := map[string]string{}
queries["fu"] = "1" // Filter usage
queries["ty"] = "4" // Filter on oneM2M CIN for sensors
// Send the request
response, err := sendRequest("GET", url, headers, nil, nil, queries, 200)
queries["ty"] = "3" // Filter on oneM2M CIN for sensors
ctx.queries = queries
err, resp := protocol.send(ctx)
if err != nil {
log.Error("populateSensors: ", err.Error())
return err
}
log.Debug("populateSensors: response: ", string(response))
var oneM2M_uril map[string][]string
err = json.Unmarshal(response, &oneM2M_uril)
if err != nil {
log.Error("populateSensors: ", err.Error())
return err
}
log.Debug("populateSensors: oneM2M_uril: ", len(oneM2M_uril))
log.Debug(oneM2M_uril)
if _, ok := oneM2M_uril["m2m:uril"]; !ok {
err := errors.New("populateSensors: CharacteristicName not found: m2m:uril")
log.Error(err.Error())
log.Error("oneM2M_create: ", err.Error())
return err
}
log.Debug("populateSensors: resp: ", resp)
log.Debug("populateSensors: TypeOf(resp): ", reflect.TypeOf(resp))
oneM2M_uril := resp.(map[string]interface{})
log.Debug("populateSensors: oneM2M_uril: ", oneM2M_uril)
log.Debug("populateSensors: TypeOf(oneM2M_uril): ", reflect.TypeOf(oneM2M_uril))
log.Debug("populateSensors: len(oneM2M_uril): ", len(oneM2M_uril))
// Loop for each CIN and build the sensor list
for _, v := range oneM2M_uril["m2m:uril"] {
log.Debug("populateSensors: Processing key: ", v)
url := "http://" + iotPlatformInfo.Address + ":" + strconv.Itoa(int(iotPlatformInfo.Port)) + "/" + v
log.Debug("populateSensors: url=", url)
// Build the headers
var headers = http.Header{}
headers["Accept"] = []string{headerAccept}
headers["Content-Type"] = []string{headerContentType}
headers["X-M2M-Origin"] = []string{"CAdmin"} // FIXME FSCOM How to get it
headers["X-M2M-RI"] = []string{uuid.New().String()}
headers["X-M2M-RVI"] = []string{"4"} // FIXME FSCOM How to get it
// Build the queries
queries := map[string]string{}
queries["fu"] = "2" // Filter usage
// Send the request
response, err := sendRequest("GET", url, headers, nil, nil, queries, 200)
if err != nil {
log.Error("populateSensors: ", err.Error())
return err
for _, v := range oneM2M_uril["m2m:uril"].([]interface{}) {
log.Debug("populateSensors: Processing key: v: ", v)
log.Debug("populateSensors: Processing key: TypeOf(v): ", reflect.TypeOf(v))
s := v.(string)
if s == "laboai-cse-in/acpCreateACPs" || s == "laboai-cse-in/CAdmin" {
// FIXME FSCOM Bug in MQTT DISCOVERY request which does not provide the same response that HTTP DISCOVERY with the same filter criteria
continue // Discard it
}
log.Debug("populateSensors: response: ", string(response))
var oneM2M_cin map[string]map[string]interface{}
err = json.Unmarshal(response, &oneM2M_cin)
ctx.to = s
ctx.queries["fu"] = "2"
err, resp := protocol.send(ctx)
if err != nil {
log.Error("populateSensors: ", err.Error())
log.Error("oneM2M_create: ", err.Error())
continue
}
log.Debug("populateSensors: resp: ", resp)
log.Debug("populateSensors: type(resp): ", reflect.TypeOf(resp))
if resp.(map[string]interface{}) == nil || resp.(map[string]interface{})["m2m:cnt"] == nil {
continue
}
oneM2M_cin := resp.(map[string]interface{})["m2m:cnt"].(map[string]interface{})
log.Debug("populateSensors: type(oneM2M_cin): ", reflect.TypeOf(oneM2M_cin))
log.Debug("populateSensors: len(oneM2M_cin): ", len(oneM2M_cin))
log.Debug("populateSensors: oneM2M_cin: ", oneM2M_cin)
for _, m := range oneM2M_cin {
//log.Debug("==> ", i, " value is ", m)
var sensor = SensorDiscoveryInfo{
IotPlatformId: iotPlatformInfo.IotPlatformId,
}
var sensor = SensorDiscoveryInfo{
IotPlatformId: iotPlatformInfo.IotPlatformId,
}
for k, v := range oneM2M_cin {
log.Debug(k, " value is ", v)
log.Debug("populateSensors: type(v): ", reflect.TypeOf(v))
// m is a map[string]interface.
// loop over keys and values in the map.
for k, v := range m {
log.Debug(k, " value is ", v)
log.Debug("populateSensors: type(v): ", reflect.TypeOf(v))
if k == "ri" {
if item, ok := v.(string); ok {
sensor.SensorIdentifier = item
} else {
log.Error("populateSensors: Failed to process ", k)
}
} else if k == "ty" {
if item, ok := v.(float64); ok {
sensor.SensorType = strconv.FormatFloat(item, 'f', -1, 64)
} else {
log.Error("populateSensors: Failed to process ", k)
}
if k == "ri" {
if item, ok := v.(string); ok {
sensor.SensorIdentifier = item
} else {
log.Error("populateSensors: Failed to process ", k)
}
} else if k == "ty" {
if item, ok := v.(float64); ok {
sensor.SensorType = strconv.FormatFloat(item, 'f', -1, 64)
} else {
sensor.SensorPropertyList = append(sensor.SensorPropertyList, k)
if item, ok := v.(string); ok {
sensor.SensorCharacteristicList = append(
sensor.SensorCharacteristicList,
SensorCharacteristic{
CharacteristicName: k,
CharacteristicValue: string(item),
})
} else if item, ok := v.(float64); ok {
sensor.SensorCharacteristicList = append(
sensor.SensorCharacteristicList,
SensorCharacteristic{
CharacteristicName: k,
CharacteristicValue: strconv.FormatFloat(item, 'f', -1, 64),
})
} else if item, ok := v.(int64); ok {
sensor.SensorCharacteristicList = append(
sensor.SensorCharacteristicList,
SensorCharacteristic{
CharacteristicName: k,
CharacteristicValue: strconv.FormatInt(item, 10),
})
} else if item, ok := v.(bool); ok {
sensor.SensorCharacteristicList = append(
sensor.SensorCharacteristicList,
SensorCharacteristic{
CharacteristicName: k,
CharacteristicValue: strconv.FormatBool(item),
})
} else if item, ok := v.([]string); ok {
sensor.SensorCharacteristicList = append(
sensor.SensorCharacteristicList,
SensorCharacteristic{
CharacteristicName: k,
CharacteristicValue: strings.Join(item, ","),
})
} else if item, ok := v.([]int64); ok {
log.Error("populateSensors: Failed to convert list of int64 into string: ", item)
} else if item, ok := v.([]interface{}); ok {
log.Debug("populateSensors: Got []interface {} for ", k)
log.Debug("populateSensors: ValueOf ", reflect.ValueOf(item))
s := SensorCharacteristic{
CharacteristicName: k,
}
var buf bytes.Buffer
fmt.Fprintf(&buf, "%T", reflect.ValueOf(item))
s.CharacteristicValue = buf.String()
sensor.SensorCharacteristicList = append(sensor.SensorCharacteristicList, s)
} else {
log.Error("populateSensors: Failed to process ", k)
log.Error("populateSensors: Failed to process ", k)
}
} else {
sensor.SensorPropertyList = append(sensor.SensorPropertyList, k)
if item, ok := v.(string); ok {
sensor.SensorCharacteristicList = append(
sensor.SensorCharacteristicList,
SensorCharacteristic{
CharacteristicName: k,
CharacteristicValue: string(item),
})
} else if item, ok := v.(float64); ok {
sensor.SensorCharacteristicList = append(
sensor.SensorCharacteristicList,
SensorCharacteristic{
CharacteristicName: k,
CharacteristicValue: strconv.FormatFloat(item, 'f', -1, 64),
})
} else if item, ok := v.(int64); ok {
sensor.SensorCharacteristicList = append(
sensor.SensorCharacteristicList,
SensorCharacteristic{
CharacteristicName: k,
CharacteristicValue: strconv.FormatInt(item, 10),
})
} else if item, ok := v.(bool); ok {
sensor.SensorCharacteristicList = append(
sensor.SensorCharacteristicList,
SensorCharacteristic{
CharacteristicName: k,
CharacteristicValue: strconv.FormatBool(item),
})
} else if item, ok := v.([]string); ok {
sensor.SensorCharacteristicList = append(
sensor.SensorCharacteristicList,
SensorCharacteristic{
CharacteristicName: k,
CharacteristicValue: strings.Join(item, ","),
})
} else if item, ok := v.([]int64); ok {
log.Error("populateSensors: Failed to convert list of int64 into string: ", item)
} else if item, ok := v.([]interface{}); ok {
log.Debug("populateSensors: Got []interface {} for ", k)
log.Debug("populateSensors: ValueOf ", reflect.ValueOf(item))
s := SensorCharacteristic{
CharacteristicName: k,
}
var buf bytes.Buffer
fmt.Fprintf(&buf, "%T", reflect.ValueOf(item))
s.CharacteristicValue = buf.String()
sensor.SensorCharacteristicList = append(sensor.SensorCharacteristicList, s)
} else {
log.Error("populateSensors: Failed to process ", k)
}
// if k == "rn" {
// if item, ok := v.(string); ok {
// sensor.DeviceId = item
// } else {
// log.Error("populateSensors: Failed to process ", k)
// }
// } else if k == "ri" {
// if item, ok := v.(string); ok {
// sensor.SensorIdentifier = item
// } else {
// log.Error("populateSensors: Failed to process ", k)
// }
// } else if k == "ty" {
// if item, ok := v.(float64); ok {
// sensor.SensorStatusType = strconv.FormatFloat(item, 'f', -1, 64)
// } else {
// log.Error("populateSensors: Failed to process ", k)
// }
// } else { // default: if k == "lt" || k == "et" || k == "ct" || k == "st" || k == "pi" || k == "lbl" {
// if item, ok := v.(string); ok {
// sensor.SensorCharacteristicList = append(
// sensor.SensorCharacteristicList,
// SensorCharacteristic{
// CharacteristicName: k,
// CharacteristicValue: string(item),
// CharacteristicUnitOfMeasure: nil,
// })
// } else if item, ok := v.(float64); ok {
// sensor.SensorCharacteristicList = append(
// sensor.SensorCharacteristicList,
// SensorCharacteristic{
// CharacteristicName: k,
// CharacteristicValue: strconv.FormatFloat(item, 'f', -1, 64),
// CharacteristicUnitOfMeasure: nil,
// })
// } else if item, ok := v.([]string); ok {
// sensor.SensorCharacteristicList = append(
// sensor.SensorCharacteristicList,
// SensorCharacteristic{
// CharacteristicName: k,
// CharacteristicValue: strings.Join(item, ","),
// CharacteristicUnitOfMeasure: nil,
// })
// } else {
// log.Error("populateSensors: Failed to process ", k)
// }
// }
} // End of 'for' loop
log.Info("populateSensors: sensor: ", sensor)
sensorsMap[sensor.SensorIdentifier] = sensor
sensorsPerPlatformMap[sensor.IotPlatformId] = append(sensorsPerPlatformMap[sensor.IotPlatformId], sensor.SensorIdentifier)
} // End of 'for' loop
}
} // End of 'for' statement
log.Info("populateSensors: sensor: ", sensor)
sensorsMap[sensor.SensorIdentifier] = sensor
sensorsPerPlatformMap[sensor.IotPlatformId] = append(sensorsPerPlatformMap[sensor.IotPlatformId], sensor.SensorIdentifier)
} // End of 'for' statement
// // 1. Get the list of the AE
// // Build the URL
// url := "http://" + iotPlatformInfo.Address + ":" + strconv.Itoa(int(iotPlatformInfo.Port)) + "/" + iotPlatformInfo.Name
// log.Debug("populateSensors: url=", url)
// // Build the headers
// var headers = http.Header{}
// headers["Accept"] = []string{headerAccept}
// headers["Content-Type"] = []string{headerContentType}
// headers["X-M2M-Origin"] = []string{"CAdmin"} // FIXME FSCOM How to get it
// headers["X-M2M-RI"] = []string{uuid.New().String()}
// headers["X-M2M-RVI"] = []string{"4"} // FIXME FSCOM How to get it
// // Build the queries
// queries := map[string]string{}
// queries["fu"] = "1" // Filter usage
// queries["ty"] = "4" // Filter on oneM2M CIN for sensors
// // Send the request
// response, err := sendRequest("GET", url, headers, nil, nil, queries, 200)
// if err != nil {
// log.Error("populateSensors: ", err.Error())
// return err
// }
// log.Debug("populateSensors: response: ", string(response))
// var oneM2M_uril map[string][]string
// err = json.Unmarshal(response, &oneM2M_uril)
// if err != nil {
// log.Error("populateSensors: ", err.Error())
// return err
// }
// log.Debug("populateSensors: oneM2M_uril: ", len(oneM2M_uril))
// log.Debug(oneM2M_uril)
// if _, ok := oneM2M_uril["m2m:uril"]; !ok {
// err := errors.New("populateSensors: CharacteristicName not found: m2m:uril")
// log.Error(err.Error())
// return err
// }
// // Loop for each CIN and build the sensor list
// for _, v := range oneM2M_uril["m2m:uril"] {
// log.Debug("populateSensors: Processing key: ", v)
// url := "http://" + iotPlatformInfo.Address + ":" + strconv.Itoa(int(iotPlatformInfo.Port)) + "/" + v
// log.Debug("populateSensors: url=", url)
// // Build the headers
// var headers = http.Header{}
// headers["Accept"] = []string{headerAccept}
// headers["Content-Type"] = []string{headerContentType}
// headers["X-M2M-Origin"] = []string{"CAdmin"} // FIXME FSCOM How to get it
// headers["X-M2M-RI"] = []string{uuid.New().String()}
// headers["X-M2M-RVI"] = []string{"4"} // FIXME FSCOM How to get it
// // Build the queries
// queries := map[string]string{}
// queries["fu"] = "2" // Filter usage
// // Send the request
// response, err := sendRequest("GET", url, headers, nil, nil, queries, 200)
// if err != nil {
// log.Error("populateSensors: ", err.Error())
// return err
// }
// log.Debug("populateSensors: response: ", string(response))
// var oneM2M_cin map[string]map[string]interface{}
// err = json.Unmarshal(response, &oneM2M_cin)
// if err != nil {
// log.Error("populateSensors: ", err.Error())
// continue
// }
// log.Debug("populateSensors: type(oneM2M_cin): ", reflect.TypeOf(oneM2M_cin))
// log.Debug("populateSensors: len(oneM2M_cin): ", len(oneM2M_cin))
// log.Debug("populateSensors: oneM2M_cin: ", oneM2M_cin)
// for _, m := range oneM2M_cin {
// //log.Debug("==> ", i, " value is ", m)
// var sensor = SensorDiscoveryInfo{
// IotPlatformId: iotPlatformInfo.IotPlatformId,
// }
// // m is a map[string]interface.
// // loop over keys and values in the map.
// for k, v := range m {
// log.Debug(k, " value is ", v)
// log.Debug("populateSensors: type(v): ", reflect.TypeOf(v))
// if k == "ri" {
// if item, ok := v.(string); ok {
// sensor.SensorIdentifier = item
// } else {
// log.Error("populateSensors: Failed to process ", k)
// }
// } else if k == "ty" {
// if item, ok := v.(float64); ok {
// sensor.SensorType = strconv.FormatFloat(item, 'f', -1, 64)
// } else {
// log.Error("populateSensors: Failed to process ", k)
// }
// } else {
// sensor.SensorPropertyList = append(sensor.SensorPropertyList, k)
// if item, ok := v.(string); ok {
// sensor.SensorCharacteristicList = append(
// sensor.SensorCharacteristicList,
// SensorCharacteristic{
// CharacteristicName: k,
// CharacteristicValue: string(item),
// })
// } else if item, ok := v.(float64); ok {
// sensor.SensorCharacteristicList = append(
// sensor.SensorCharacteristicList,
// SensorCharacteristic{
// CharacteristicName: k,
// CharacteristicValue: strconv.FormatFloat(item, 'f', -1, 64),
// })
// } else if item, ok := v.(int64); ok {
// sensor.SensorCharacteristicList = append(
// sensor.SensorCharacteristicList,
// SensorCharacteristic{
// CharacteristicName: k,
// CharacteristicValue: strconv.FormatInt(item, 10),
// })
// } else if item, ok := v.(bool); ok {
// sensor.SensorCharacteristicList = append(
// sensor.SensorCharacteristicList,
// SensorCharacteristic{
// CharacteristicName: k,
// CharacteristicValue: strconv.FormatBool(item),
// })
// } else if item, ok := v.([]string); ok {
// sensor.SensorCharacteristicList = append(
// sensor.SensorCharacteristicList,
// SensorCharacteristic{
// CharacteristicName: k,
// CharacteristicValue: strings.Join(item, ","),
// })
// } else if item, ok := v.([]int64); ok {
// log.Error("populateSensors: Failed to convert list of int64 into string: ", item)
// } else if item, ok := v.([]interface{}); ok {
// log.Debug("populateSensors: Got []interface {} for ", k)
// log.Debug("populateSensors: ValueOf ", reflect.ValueOf(item))
// s := SensorCharacteristic{
// CharacteristicName: k,
// }
// var buf bytes.Buffer
// fmt.Fprintf(&buf, "%T", reflect.ValueOf(item))
// s.CharacteristicValue = buf.String()
// sensor.SensorCharacteristicList = append(sensor.SensorCharacteristicList, s)
// } else {
// log.Error("populateSensors: Failed to process ", k)
// }
// }
// // if k == "rn" {
// // if item, ok := v.(string); ok {
// // sensor.DeviceId = item
// // } else {
// // log.Error("populateSensors: Failed to process ", k)
// // }
// // } else if k == "ri" {
// // if item, ok := v.(string); ok {
// // sensor.SensorIdentifier = item
// // } else {
// // log.Error("populateSensors: Failed to process ", k)
// // }
// // } else if k == "ty" {
// // if item, ok := v.(float64); ok {
// // sensor.SensorStatusType = strconv.FormatFloat(item, 'f', -1, 64)
// // } else {
// // log.Error("populateSensors: Failed to process ", k)
// // }
// // } else { // default: if k == "lt" || k == "et" || k == "ct" || k == "st" || k == "pi" || k == "lbl" {
// // if item, ok := v.(string); ok {
// // sensor.SensorCharacteristicList = append(
// // sensor.SensorCharacteristicList,
// // SensorCharacteristic{
// // CharacteristicName: k,
// // CharacteristicValue: string(item),
// // CharacteristicUnitOfMeasure: nil,
// // })
// // } else if item, ok := v.(float64); ok {
// // sensor.SensorCharacteristicList = append(
// // sensor.SensorCharacteristicList,
// // SensorCharacteristic{
// // CharacteristicName: k,
// // CharacteristicValue: strconv.FormatFloat(item, 'f', -1, 64),
// // CharacteristicUnitOfMeasure: nil,
// // })
// // } else if item, ok := v.([]string); ok {
// // sensor.SensorCharacteristicList = append(
// // sensor.SensorCharacteristicList,
// // SensorCharacteristic{
// // CharacteristicName: k,
// // CharacteristicValue: strings.Join(item, ","),
// // CharacteristicUnitOfMeasure: nil,
// // })
// // } else {
// // log.Error("populateSensors: Failed to process ", k)
// // }
// // }
// } // End of 'for' loop
// log.Info("populateSensors: sensor: ", sensor)
// sensorsMap[sensor.SensorIdentifier] = sensor
// sensorsPerPlatformMap[sensor.IotPlatformId] = append(sensorsPerPlatformMap[sensor.IotPlatformId], sensor.SensorIdentifier)
// } // End of 'for' loop
// } // End of 'for' statement
log.Info("populateSensors: sensorsMap: ", sensorsMap)
log.Info("populateSensors: sensorsPerPlatformMap: ", sensorsPerPlatformMap)
......@@ -498,21 +663,7 @@ func (tm *SssMgr) populateSensors(iotPlatformInfo IotPlatformInfo) error {
func (tm *SssMgr) oneM2M_create(sensor SensorDiscoveryInfo, requestedIotPlatformId string, type_ string) (sensorResp SensorDiscoveryInfo, err error) {
// FIXME FSCOM: requestedIotPlatformId should be useless
// Build the headers
var headers = http.Header{}
headers["Accept"] = []string{headerAccept}
headers["X-M2M-Origin"] = []string{"C" + requestedIotPlatformId} // FIXME FSCOM How to get it
headers["X-M2M-RI"] = []string{uuid.New().String()}
headers["X-M2M-RVI"] = []string{"4"}
var s string
if type_ == "AE" {
s = headerContentType + ";ty=2"
} else if type_ == "CNT" {
s = headerContentType + ";ty=4"
}
headers["Content-Type"] = []string{s}
// Build the url and the body
var url string
// Create the initial payload dictionary
var bodyMap = map[string]map[string]interface{}{}
// Initialize the entry
if type_ == "AE" { // FIXME FSCOM Clarify how to map Deviceinfo with oneM2M AE/CNT/fexContainer
......@@ -521,8 +672,6 @@ func (tm *SssMgr) oneM2M_create(sensor SensorDiscoveryInfo, requestedIotPlatform
bodyMap["m2m:ae"]["rn"] = sensor.SensorIdentifier
bodyMap["m2m:ae"]["rr"] = false
bodyMap["m2m:ae"]["srv"] = []string{"4"}
url = "http://" + registeredIotPlatformsMap[requestedIotPlatformId].Address + ":" + strconv.Itoa(int(registeredIotPlatformsMap[requestedIotPlatformId].Port)) + "/" + registeredIotPlatformsMap[requestedIotPlatformId].Name
} else if type_ == "CNT" {
bodyMap["m2m:cnt"] = make(map[string]interface{}, 0)
bodyMap["m2m:cnt"]["mbs"] = 10000
......@@ -536,42 +685,116 @@ func (tm *SssMgr) oneM2M_create(sensor SensorDiscoveryInfo, requestedIotPlatform
// FIXME FSCOM Add metadata
} // End of 'for' statement
}
url = "http://" + registeredIotPlatformsMap[requestedIotPlatformId].Address + ":" + strconv.Itoa(int(registeredIotPlatformsMap[requestedIotPlatformId].Port)) + "/" + registeredIotPlatformsMap[requestedIotPlatformId].Name
} else {
err = errors.New("oneM2M_create: Invalid type")
log.Error("oneM2M_create: ", err.Error())
return sensorResp, err
}
log.Debug("oneM2M_create: url=", url)
log.Debug("oneM2M_create: bodyMap=", bodyMap)
body, err := json.Marshal(bodyMap)
if err != nil {
log.Error("oneM2M_create: ", err.Error())
return sensorResp, err
// Send it and get the result
var ctx = SssMgrBindingProtocolContext{
host: registeredIotPlatformsMap[requestedIotPlatformId].Address,
port: registeredIotPlatformsMap[requestedIotPlatformId].Port,
name: registeredIotPlatformsMap[requestedIotPlatformId].Name,
to: registeredIotPlatformsMap[requestedIotPlatformId].Name,
from: requestedIotPlatformId,
op: 1, // CREATE
rqi: uuid.New().String(),
rvi: []string{"4"}, // FIXME FSCOM How to get it
body: bodyMap,
code: 201,
}
log.Debug("oneM2M_create: Request body: ", string(body))
// Send the request
response, err := sendRequest("POST", url, headers, bytes.NewBuffer(body), nil, nil, 201)
if err != nil {
log.Error("oneM2M_create: ", err.Error())
if type_ == "AE" {
ctx.ty = 2
} else if type_ == "CNT" {
ctx.ty = 4
} else {
err = errors.New("oneM2M_create: Invalid type")
log.Error("send: ", err.Error())
return sensorResp, err
}
log.Debug("oneM2M_create: response: ", string(response))
var d map[string]map[string]interface{}
err = json.Unmarshal(response, &d)
//var resp = map[string]map[string]interface{}{}
err, resp := protocol.send(ctx)
if err != nil {
log.Error("oneM2M_create: ", err.Error())
return sensorResp, err
}
log.Debug("oneM2M_create: d: ", d)
// Build the headers
// var headers = http.Header{}
// headers["Accept"] = []string{headerAccept}
// headers["X-M2M-Origin"] = []string{"C" + requestedIotPlatformId} // FIXME FSCOM How to get it
// headers["X-M2M-RI"] = []string{uuid.New().String()}
// headers["X-M2M-RVI"] = []string{"4"}
// var s string
// if type_ == "AE" {
// s = headerContentType + ";ty=2"
// } else if type_ == "CNT" {
// s = headerContentType + ";ty=4"
// }
// headers["Content-Type"] = []string{s}
// // Build the url and the body
// var url string
// var bodyMap = map[string]map[string]interface{}{}
// Initialize the entry
// if type_ == "AE" { // FIXME FSCOM Clarify how to map Deviceinfo with oneM2M AE/CNT/fexContainer
// bodyMap["m2m:ae"] = make(map[string]interface{}, 0)
// bodyMap["m2m:ae"]["api"] = "Norg.etsi." + requestedIotPlatformId + "." + sensor.SensorIdentifier
// bodyMap["m2m:ae"]["rn"] = sensor.SensorIdentifier
// bodyMap["m2m:ae"]["rr"] = false
// bodyMap["m2m:ae"]["srv"] = []string{"4"}
// url = "http://" + registeredIotPlatformsMap[requestedIotPlatformId].Address + ":" + strconv.Itoa(int(registeredIotPlatformsMap[requestedIotPlatformId].Port)) + "/" + registeredIotPlatformsMap[requestedIotPlatformId].Name
// } else if type_ == "CNT" {
// bodyMap["m2m:cnt"] = make(map[string]interface{}, 0)
// bodyMap["m2m:cnt"]["mbs"] = 10000
// bodyMap["m2m:cnt"]["mni"] = 10
// bodyMap["m2m:cnt"]["rn"] = sensor.SensorIdentifier
// bodyMap["m2m:cnt"]["srv"] = []string{"4"}
// // Add metadata
// if len(sensor.SensorCharacteristicList) != 0 {
// for _, val := range sensor.SensorCharacteristicList {
// log.Debug("oneM2M_create: Adding CNT metadata: ", val)
// // FIXME FSCOM Add metadata
// } // End of 'for' statement
// }
// url = "http://" + registeredIotPlatformsMap[requestedIotPlatformId].Address + ":" + strconv.Itoa(int(registeredIotPlatformsMap[requestedIotPlatformId].Port)) + "/" + registeredIotPlatformsMap[requestedIotPlatformId].Name
// } else {
// err = errors.New("oneM2M_create: Invalid type")
// log.Error("oneM2M_create: ", err.Error())
// return sensorResp, err
// }
// log.Debug("oneM2M_create: url=", url)
// log.Debug("oneM2M_create: bodyMap=", bodyMap)
// body, err := json.Marshal(bodyMap)
// if err != nil {
// log.Error("oneM2M_create: ", err.Error())
// return sensorResp, err
// }
// log.Debug("oneM2M_create: Request body: ", string(body))
// Send the request
// response, err := sendRequest("POST", url, headers, bytes.NewBuffer(body), nil, nil, 201)
// if err != nil {
// log.Error("oneM2M_create: ", err.Error())
// return sensorResp, err
// }
// log.Debug("oneM2M_create: response: ", string(response))
// var d map[string]map[string]interface{}
// err = json.Unmarshal(response, &d)
// if err != nil {
// log.Error("oneM2M_create: ", err.Error())
// return sensorResp, err
// }
log.Debug("oneM2M_create: d: ", resp)
log.Debug("oneM2M_create: TypeOf(d): ", reflect.TypeOf(resp))
// Add additional entries
sensorResp, err = tm.oneM2M_deserialize(sensorResp, d)
if err != nil {
log.Error("oneM2M_create: ", err.Error())
return sensorResp, err
}
log.Debug("oneM2M_create: sensorResp: ", sensorResp)
// sensorResp, err = tm.oneM2M_deserialize(sensorResp, resp)
// if err != nil {
// log.Error("oneM2M_create: ", err.Error())
// return sensorResp, err
// }
// log.Debug("oneM2M_create: sensorResp: ", sensorResp)
return sensorResp, nil
}
......@@ -683,6 +906,54 @@ func (tm *SssMgr) oneM2M_get(sensor SensorDiscoveryInfo, requestedIotPlatformId
return sensorResp, nil
}
func (tm *SssMgr) oneM2M_subscribe(sensor SensorDiscoveryInfo, requestedIotPlatformId string, type_ string) (subscription string, err error) {
// FIXME FSCOM: requestedIotPlatformId should be useless
// Build the headers
var headers = http.Header{}
headers["Accept"] = []string{headerAccept}
headers["X-M2M-Origin"] = []string{"C" + requestedIotPlatformId} // FIXME FSCOM How to get it
headers["X-M2M-RI"] = []string{uuid.New().String()}
headers["X-M2M-RVI"] = []string{"4"}
s := headerContentType + ";ty=23"
headers["Content-Type"] = []string{s}
// Build the url and the body
var url string
var bodyMap = map[string]map[string]interface{}{}
bodyMap["m2m:sub"] = make(map[string]interface{}, 0)
net := make(map[string][4]int)
net["net"] = [4]int{1, 2, 3, 4}
bodyMap["m2m:sub"]["enc"] = net
bodyMap["m2m:sub"]["nu"] = "" // FIXME The URI of the listener
bodyMap["m2m:sub"]["rn"] = sensor.SensorIdentifier
url = "http://" + registeredIotPlatformsMap[requestedIotPlatformId].Address + ":" + strconv.Itoa(int(registeredIotPlatformsMap[requestedIotPlatformId].Port)) + "/" + registeredIotPlatformsMap[requestedIotPlatformId].Name
log.Debug("oneM2M_subscribe: url=", url)
log.Debug("oneM2M_subscribe: bodyMap=", bodyMap)
body, err := json.Marshal(bodyMap)
if err != nil {
log.Error("oneM2M_subscribe: ", err.Error())
return "", err
}
log.Debug("oneM2M_subscribe: Request body: ", string(body))
// Send the request
response, err := sendRequest("POST", url, headers, bytes.NewBuffer(body), nil, nil, 201)
if err != nil {
log.Error("oneM2M_subscribe: ", err.Error())
return "", err
}
log.Debug("oneM2M_subscribe: response: ", string(response))
var d map[string]map[string]interface{}
err = json.Unmarshal(response, &d)
if err != nil {
log.Error("oneM2M_subscribe: ", err.Error())
return "", err
}
log.Debug("oneM2M_subscribe: d: ", d)
err = errors.New("oneM2M_subscribe: To be implemented")
return "", err /*nil*/
}
func (tm *SssMgr) oneM2M_delete(sensor SensorDiscoveryInfo, requestedIotPlatformId string, type_ string) (err error) {
// FIXME FSCOM: requestedIotPlatformId should be useless
......@@ -809,45 +1080,3 @@ func (tm *SssMgr) oneM2M_deserialize(sensor SensorDiscoveryInfo, response map[st
return sensorResp, nil
}
func sendRequest(method string, url string, headers http.Header, body io.Reader, vars map[string]string, query map[string]string, code int) ([]byte, error) {
//log.Debug(">>> sendRequest: url: ", url)
//log.Debug(">>> sendRequest: headers: ", headers)
req, err := http.NewRequest(method, url, body)
if err != nil || req == nil {
return nil, err
}
if vars != nil {
req = mux.SetURLVars(req, vars)
}
if query != nil {
q := req.URL.Query()
for k, v := range query {
q.Add(k, v)
}
req.URL.RawQuery = q.Encode()
}
req.Header = headers
req.Close = true
//log.Debug("sendRequest: req: ", req)
rr, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
// Check the status code is what we expect.
//log.Debug("sendRequest: rr: ", rr)
if status := rr.StatusCode; status != code {
s := fmt.Sprintf("Wrong status code - got %v want %v", status, code)
return nil, errors.New(s)
}
responseData, err := ioutil.ReadAll(rr.Body)
if err != nil {
return nil, err
}
//log.Debug("sendRequest: responseData: ", responseData)
return responseData, nil
}
......@@ -18,7 +18,6 @@ package sssmgr
import (
"fmt"
"math/rand"
"reflect"
"testing"
......@@ -34,107 +33,202 @@ func TestNewSssMgr(t *testing.T) {
// Invalid Connector
fmt.Println("Invalid SSS Asset Manager")
tm, err := NewSssMgr("", tmNamespace, nil, nil, nil)
tm, err := NewSssMgr("", tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil)
if err == nil || tm != nil {
t.Fatalf("DB connection should have failed")
t.Fatalf("Service name not set")
}
tm, err = NewSssMgr(tmName, tmNamespace, "", "172.29.10.56", 1883, nil, nil, nil)
if err == nil || tm != nil {
t.Fatalf("Binding protocol not set")
}
tm, err = NewSssMgr(tmName, tmNamespace, "MQTT", "", 1883, nil, nil, nil)
if err == nil || tm != nil {
t.Fatalf("Binding protocol not set")
}
// Valid Connector
fmt.Println("Create valid SSS Asset Manager")
tm, err = NewSssMgr(tmName, tmNamespace, nil, nil, nil)
tm, err = NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil)
if err != nil || tm == nil {
t.Fatalf("Failed to create SSS Asset Manager")
}
// Cleanup
err = tm.DeleteSssMgr()
if err != nil {
t.Fatalf("Failed to cleanup SSS Asset Manager")
}
}
tm = nil
func TestPopulateDevicesPerIotPlatforms(t *testing.T) {
fmt.Println("--- ", t.Name())
log.MeepTextLogInit(t.Name())
// Valid Connector
fmt.Println("Create valid SSS Asset Manager")
tm, err := NewSssMgr(tmName, tmNamespace, nil, nil, nil)
tm, err = NewSssMgr(tmName, tmNamespace, "HTTP", "172.29.10.56", 1883, nil, nil, nil)
if err != nil || tm == nil {
t.Fatalf("Failed to create SSS Asset Manager")
}
err = tm.populateDevicesPerIotPlatforms()
if err != nil {
t.Fatalf(err.Error())
}
// Cleanup
err = tm.DeleteSssMgr()
if err != nil {
t.Fatalf("Failed to cleanup SSS Asset Manager")
}
tm = nil
}
func TestSensorDiscoveryInfoAll(t *testing.T) {
fmt.Println("--- ", t.Name())
log.MeepTextLogInit(t.Name())
// func TestPopulateDevicesPerIotPlatformsHttp(t *testing.T) {
// fmt.Println("--- ", t.Name())
// log.MeepTextLogInit(t.Name())
// // Valid Connector
// fmt.Println("Create valid SSS Asset Manager")
// tm, err := NewSssMgr(tmName, tmNamespace, "HTTP", "", 0, nil, nil, nil)
// if err != nil || tm == nil {
// t.Fatalf("Failed to create SSS Asset Manager")
// }
// Valid Connector
fmt.Println("Create valid SSS Asset Manager")
tm, err := NewSssMgr(tmName, tmNamespace, nil, nil, nil)
if err != nil || tm == nil {
t.Fatalf("Failed to create SSS Asset Manager")
}
// err = tm.populateDevicesPerIotPlatforms()
// if err != nil {
// t.Fatalf(err.Error())
// }
err = tm.populateDevicesPerIotPlatforms()
if err != nil {
t.Fatalf(err.Error())
}
// // Cleanup
// err = tm.DeleteSssMgr()
// if err != nil {
// t.Fatalf("Failed to cleanup SSS Asset Manager")
// }
// tm = nil
// }
sensors, err := tm.SensorDiscoveryInfoAll()
if err != nil {
t.Fatalf(err.Error())
}
fmt.Println("sensors: ", sensors)
// func TestSensorDiscoveryInfoAllHttp(t *testing.T) {
// fmt.Println("--- ", t.Name())
// log.MeepTextLogInit(t.Name())
// // Valid Connector
// fmt.Println("Create valid SSS Asset Manager")
// tm, err := NewSssMgr(tmName, tmNamespace, "HTTP", "", 0, nil, nil, nil)
// if err != nil || tm == nil {
// t.Fatalf("Failed to create SSS Asset Manager")
// }
// Cleanup
err = tm.DeleteSssMgr()
if err != nil {
t.Fatalf("Failed to cleanup SSS Asset Manager")
}
}
// sensors, err := tm.SensorDiscoveryInfoAll()
// if err != nil {
// t.Fatalf(err.Error())
// }
// fmt.Println("len=", len(sensors))
// fmt.Println("sensors", sensors)
// // Cleanup
// err = tm.DeleteSssMgr()
// if err != nil {
// t.Fatalf("Failed to cleanup SSS Asset Manager")
// }
// tm = nil
// }
// func TestGetSensorHttp(t *testing.T) {
// fmt.Println("--- ", t.Name())
// log.MeepTextLogInit(t.Name())
// // Valid Connector
// fmt.Println("Create valid SSS Asset Manager")
// tm, err := NewSssMgr(tmName, tmNamespace, "HTTP", "", 0, nil, nil, nil)
// if err != nil || tm == nil {
// t.Fatalf("Failed to create SSS Asset Manager")
// }
// sensors, err := tm.SensorDiscoveryInfoAll()
// if err != nil {
// t.Fatalf(err.Error())
// }
// for _, v := range sensors {
// fmt.Println("v", v)
// fmt.Println("TypeOf(v)", reflect.TypeOf(v))
// sensor, err := tm.GetSensor(v.SensorIdentifier)
// if !validate_sensor_discovery_info(v, sensor) {
// t.Fatalf(err.Error())
// }
// }
// // Cleanup
// err = tm.DeleteSssMgr()
// if err != nil {
// t.Fatalf("Failed to cleanup SSS Asset Manager")
// }
// tm = nil
// }
// func TestPopulateDevicesPerIotPlatformsMqtt(t *testing.T) {
// fmt.Println("--- ", t.Name())
// log.MeepTextLogInit(t.Name())
// // Valid Connector
// fmt.Println("Create valid SSS Asset Manager")
// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil)
// if err != nil || tm == nil {
// t.Fatalf("Failed to create SSS Asset Manager")
// }
// err = tm.populateDevicesPerIotPlatforms()
// if err != nil {
// t.Fatalf(err.Error())
// }
// // Cleanup
// err = tm.DeleteSssMgr()
// if err != nil {
// t.Fatalf("Failed to cleanup SSS Asset Manager")
// }
// tm = nil
// }
// func TestSensorDiscoveryInfoAllMqtt(t *testing.T) {
// fmt.Println("--- ", t.Name())
// log.MeepTextLogInit(t.Name())
// // Valid Connector
// fmt.Println("Create valid SSS Asset Manager")
// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil)
// if err != nil || tm == nil {
// t.Fatalf("Failed to create SSS Asset Manager")
// }
// err = tm.populateDevicesPerIotPlatforms()
// if err != nil {
// t.Fatalf(err.Error())
// }
// sensors, err := tm.SensorDiscoveryInfoAll()
// if err != nil {
// t.Fatalf(err.Error())
// }
// fmt.Println("sensors: ", sensors)
// // Cleanup
// err = tm.DeleteSssMgr()
// if err != nil {
// t.Fatalf("Failed to cleanup SSS Asset Manager")
// }
// }
func TestGetSensor(t *testing.T) {
func TestGetSensorMqtt(t *testing.T) {
fmt.Println("--- ", t.Name())
log.MeepTextLogInit(t.Name())
// Valid Connector
fmt.Println("Create valid SSS Asset Manager")
tm, err := NewSssMgr(tmName, tmNamespace, nil, nil, nil)
tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil)
if err != nil || tm == nil {
t.Fatalf("Failed to create SSS Asset Manager")
}
err = tm.populateDevicesPerIotPlatforms()
if err != nil {
t.Fatalf(err.Error())
}
sensors, err := tm.SensorDiscoveryInfoAll()
if err != nil {
t.Fatalf(err.Error())
}
fmt.Println("sensors: ", sensors)
idx := rand.Int31n(int32(len(sensors)))
sensor, err := tm.GetSensor(sensors[idx].SensorIdentifier)
if err != nil {
t.Fatalf(err.Error())
}
fmt.Println("sensor: ", sensor)
if !validate_sensor_discovery_info(sensors[idx], sensor) {
t.Fatalf("Value mismatch")
for _, v := range sensors {
fmt.Println("v", v)
fmt.Println("TypeOf(v)", reflect.TypeOf(v))
sensor, err := tm.GetSensor(v.SensorIdentifier)
if !validate_sensor_discovery_info(v, sensor) {
t.Fatalf(err.Error())
}
}
// Cleanup
......@@ -142,15 +236,76 @@ func TestGetSensor(t *testing.T) {
if err != nil {
t.Fatalf("Failed to cleanup SSS Asset Manager")
}
tm = nil
}
// func TestVaidateOneM2MNotificationServer(t *testing.T) {
// fmt.Println("--- ", t.Name())
// log.MeepTextLogInit(t.Name())
// // Valid Connector
// fmt.Println("Create valid SSS Asset Manager")
// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, "MQTT", "172.29.10.56", 1883, nil, nil, nil)
// if err != nil || tm == nil {
// t.Fatalf("Failed to create SSS Asset Manager")
// }
// tm.init()
// fmt.Println("Waiting for 2 minutes to do curl request: curl -v http://mec-platform.etsi.org:33122/sbxykqjr17/mep1/sens/v1 ")
// // Cleanup
// err = tm.DeleteSssMgr()
// if err != nil {
// t.Fatalf("Failed to cleanup SSS Asset Manager")
// }
// }
// func TestGetSensor(t *testing.T) {
// fmt.Println("--- ", t.Name())
// log.MeepTextLogInit(t.Name())
// // Valid Connector
// fmt.Println("Create valid SSS Asset Manager")
// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil)
// if err != nil || tm == nil {
// t.Fatalf("Failed to create SSS Asset Manager")
// }
// err = tm.populateDevicesPerIotPlatforms()
// if err != nil {
// t.Fatalf(err.Error())
// }
// sensors, err := tm.SensorDiscoveryInfoAll()
// if err != nil {
// t.Fatalf(err.Error())
// }
// fmt.Println("sensors: ", sensors)
// idx := rand.Int31n(int32(len(sensors)))
// sensor, err := tm.GetSensor(sensors[idx].SensorIdentifier)
// if err != nil {
// t.Fatalf(err.Error())
// }
// fmt.Println("sensor: ", sensor)
// if !validate_sensor_discovery_info(sensors[idx], sensor) {
// t.Fatalf("Value mismatch")
// }
// // Cleanup
// err = tm.DeleteSssMgr()
// if err != nil {
// t.Fatalf("Failed to cleanup SSS Asset Manager")
// }
// }
// func TestOneM2mCreateAEAndCNT(t *testing.T) {
// fmt.Println("--- ", t.Name())
// log.MeepTextLogInit(t.Name())
// // Valid Connector
// fmt.Println("Create valid SSS Asset Manager")
// tm, err := NewSssMgr(tmName, tmNamespace, nil, nil, nil)
// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil)
// if err != nil || tm == nil {
// t.Fatalf("Failed to create SSS Asset Manager")
// }
......
......@@ -161,9 +161,9 @@ func (broker_mqtt *message_broker_mqtt) Send(tm *TrafficMgr, msgContent string,
log.Error(err.Error())
return err
}
log.Info("message_broker_simu: Send: Publish content : ", content)
log.Info("message_broker_simu: Send: msgEncodeFormat: ", msgEncodeFormat)
log.Info("message_broker_simu: Send: stdOrganization: ", stdOrganization)
log.Info("message_broker_mqtt: Send: Publish content : ", content)
log.Info("message_broker_mqtt: Send: msgEncodeFormat: ", msgEncodeFormat)
log.Info("message_broker_mqtt: Send: stdOrganization: ", stdOrganization)
token := broker_mqtt.client.Publish(tm.topic, 0, false, content)
token.Wait()
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment