Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found

Target

Select target project
  • mec/etsi-mec-sandbox
1 result
Show changes
Commits on Source (2)
......@@ -20,4 +20,8 @@ COPY ./data /
RUN chmod +x /entrypoint.sh
RUN dpkg --configure -a
EXPOSE 33122/tcp
ENTRYPOINT ["/entrypoint.sh"]
package sssmgr
type SssMgrBindingProtocol interface {
init(tm *SssMgr) (err error)
send(p_ctx SssMgrBindingProtocolContext) (err error, resp interface{})
//(p_op string, p_type string, p_to string, p_from string, p_ri string, p_rev []string, p_bodyMap map[string]map[string]interface{}) (err error, resp map[string]map[string]interface{})
notify(p_resp string) (err error)
uninit() (err error)
}
type SssMgrBindingProtocolContext struct {
host string
port int
name string
to string
from string
op int
ty int
rqi string
rvi []string
queries map[string]string
body map[string]map[string]interface{}
code int
}
/**
TS-0001 V4.22 Table 9.6.2.3-1: Types of parameters in accessControlOperations
TS-0004 V4.22 Table 6.3.4.2.5 1: Interpretation of m2m:operation
**/
package sssmgr
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"io"
"io/ioutil"
"net/http"
"net/url"
"reflect"
"strconv"
log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
"github.com/gorilla/mux"
)
type SssMgrHttp struct {
}
const (
headerAccept = "application/json"
headerContentType = "application/json"
)
func NewSssMgrHttp() (http_mgr *SssMgrHttp) {
log.Info(">>> NewSssMgrHttp")
return new(SssMgrHttp)
}
func (http_mgr *SssMgrHttp) init(tm *SssMgr) (err error) {
log.Info(">>> init")
// log.Info("Init: Starting OneM2M Notification server")
// go func() {
// http.HandleFunc("/", tm.handleRoot)
// err := http.ListenAndServe(":33122", nil)
// if err != nil {
// log.Error(err.Error())
// return
// }
// log.Info("<<< Init: Terminating OneM2M Notification server")
// }()
return nil
}
func (http_mgr *SssMgrHttp) handleRoot(w http.ResponseWriter, r *http.Request) {
log.Debug(">>> handleRoot: ", r)
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
// get & validate query param values for sensorIdentifier
u, _ := url.Parse(r.URL.String())
log.Info("url: ", u.RequestURI())
w.WriteHeader(http.StatusOK)
}
func (http_mgr *SssMgrHttp) uninit() (err error) {
log.Info(">>> uninit")
return nil
}
func (http_mgr *SssMgrHttp) send(p_ctx SssMgrBindingProtocolContext) (err error, resp interface{}) {
log.Info(">>> send: ", p_ctx)
// Build the headers
var headers = http.Header{}
headers["Accept"] = []string{headerAccept}
headers["X-M2M-Origin"] = []string{"C" + p_ctx.from}
headers["X-M2M-RI"] = []string{p_ctx.rqi}
headers["X-M2M-RVI"] = p_ctx.rvi
s := headerContentType
if p_ctx.ty != -1 {
s = s + ";ty=" + strconv.Itoa(p_ctx.ty)
}
headers["Content-Type"] = []string{s}
// Build the url
url := "http://" + p_ctx.host + ":" + strconv.Itoa(p_ctx.port) + "/" + p_ctx.to
// Set the method
method := ""
if p_ctx.op == 1 {
method = "POST"
} else if p_ctx.op == 2 {
method = "GET"
} else if p_ctx.op == 3 {
method = "PATCH"
} else if p_ctx.op == 4 {
method = "DELETE"
} else {
err := errors.New("Invalid p_ctx.op")
log.Error("send: ", err.Error())
return err, nil
}
// Finalize the body
log.Debug("send: url=", url)
if p_ctx.body != nil { // With body
log.Debug("send: p_ctx.body=", p_ctx.body)
body, err := json.Marshal(p_ctx.body)
if err != nil {
log.Error("send: ", err.Error())
return err, nil
}
log.Debug("send: Request body: ", string(body))
response, err := sendRequest(method, url, headers, bytes.NewBuffer(body), nil, p_ctx.queries, 201)
if err != nil {
log.Error("send: ", err.Error())
return err, nil
}
log.Debug("send: response: ", string(response))
err = json.Unmarshal(response, &resp)
if err != nil {
log.Error("send: ", err.Error())
return err, nil
}
log.Debug("send: response: ", resp)
log.Debug("send: TypeOf(response): ", reflect.TypeOf(resp))
} else { // Without body
response, err := sendRequest(method, url, headers, nil, nil, p_ctx.queries, p_ctx.code)
if err != nil {
log.Error("send: ", err.Error())
return err, nil
}
log.Debug("send: response: ", string(response))
err = json.Unmarshal(response, &resp)
if err != nil {
log.Error("send: ", err.Error())
return err, nil
}
log.Debug("send: response: ", resp)
log.Debug("send: TypeOf(response): ", reflect.TypeOf(resp))
}
return nil, resp
}
func (http_mgr *SssMgrHttp) notify(p_resp string) (err error) {
log.Info(">>> notify")
return nil
}
func sendRequest(method string, url string, headers http.Header, body io.Reader, vars map[string]string, query map[string]string, code int) ([]byte, error) {
//log.Debug(">>> sendRequest: url: ", url)
//log.Debug(">>> sendRequest: headers: ", headers)
req, err := http.NewRequest(method, url, body)
if err != nil || req == nil {
return nil, err
}
if vars != nil {
req = mux.SetURLVars(req, vars)
}
if query != nil {
q := req.URL.Query()
for k, v := range query {
q.Add(k, v)
}
req.URL.RawQuery = q.Encode()
}
req.Header = headers
req.Close = true
//log.Debug("sendRequest: req: ", req)
rr, err := http.DefaultClient.Do(req)
if err != nil {
return nil, err
}
// Check the status code is what we expect.
//log.Debug("sendRequest: rr: ", rr)
if status := rr.StatusCode; status != code {
s := fmt.Sprintf("Wrong status code - got %v want %v", status, code)
return nil, errors.New(s)
}
responseData, err := ioutil.ReadAll(rr.Body)
if err != nil {
return nil, err
}
//log.Debug("sendRequest: responseData: ", responseData)
return responseData, nil
}
package sssmgr
import (
"encoding/json"
"errors"
"fmt"
"reflect"
"strconv"
"sync"
log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
type SssMgrMqtt struct {
running bool
opts *mqtt.ClientOptions
client mqtt.Client
sent_topic string
onem2m_notify func(p_topic string, p_payload []byte)
}
var _onem2m_notify func(p_topic string, p_payload []byte)
var _sync_response *sync.WaitGroup = nil // Used to synchronize the response
var _responses map[uint16]mqtt.Message
func onMessageReceived(client mqtt.Client, msg mqtt.Message) {
go func() {
log.Info("onMessageReceived: Received message: ", msg.Payload(), " on topic ", msg.Topic())
// if _onem2m_notify != nil {
// _onem2m_notify(msg.Topic(), msg.Payload())
// } else {
// log.Info("onMessageReceived: null pointer for the callbacl")
// }
}()
}
func onMessageReceivedReq(client mqtt.Client, msg mqtt.Message) {
go func() {
log.Info("onMessageReceivedReq: Received message: ", string(msg.Payload()), " on topic ", msg.Topic())
if _onem2m_notify != nil {
_onem2m_notify(msg.Topic(), msg.Payload())
} else {
log.Info("onMessageReceivedReq: null pointer for the callbacl")
}
}()
}
func onMessageReceivedResp(client mqtt.Client, msg mqtt.Message) {
go func() {
log.Info("onMessageReceivedResp: Received message: ", string(msg.Payload()), " on topic ", msg.Topic())
defer _sync_response.Done()
_responses[msg.MessageID()] = msg
}()
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
log.Info("mqtt.OnConnectHandler: Connected")
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
log.Info("Connect lost:", err)
}
func NewSssMgrMqtt() (broker_mqtt *SssMgrMqtt) {
log.Info(">>> NewSssMgrMqtt")
return new(SssMgrMqtt)
}
func (broker_mqtt *SssMgrMqtt) init(tm *SssMgr) (err error) {
log.Info(">>> init")
broker_mqtt.running = false
broker_mqtt.opts = mqtt.NewClientOptions()
broker_mqtt.opts.SetDefaultPublishHandler(onMessageReceived)
broker_mqtt.opts.SetClientID("AdvantEDGE.meep-vis-traffic-mgr")
broker_mqtt.opts.OnConnect = connectHandler
broker_mqtt.opts.OnConnectionLost = connectLostHandler
broker_mqtt.opts.CleanSession = true
broker_mqtt.opts.SetUsername("")
broker_mqtt.opts.SetPassword("")
log.Info("init: Add brocker: ", fmt.Sprintf("tcp://%s:%d", tm.host, tm.port))
broker_mqtt.opts.AddBroker(fmt.Sprintf("tcp://%s:%d", tm.host, tm.port))
broker_mqtt.client = mqtt.NewClient(broker_mqtt.opts)
log.Info("init: Connect to MQTT server...")
token := broker_mqtt.client.Connect()
if token.Error() != nil {
log.Error(token.Error())
return token.Error()
}
token.Wait()
// Subscribe
log.Info("init: Subscribe to: ", "oneM2M/req/+")
token = broker_mqtt.client.Subscribe("oneM2M/req/+", 0, onMessageReceivedReq) // qos:0
if token.Error() != nil {
log.Error(token.Error())
return token.Error()
}
// token.Wait()
log.Info("init: Subscribe to: ", "/oneM2M/resp/#")
token = broker_mqtt.client.Subscribe("/oneM2M/resp/#", 0, onMessageReceivedResp) // qos:0
if token.Error() != nil {
log.Error(token.Error())
return token.Error()
}
token.Wait()
_onem2m_notify = broker_mqtt.onem2m_notify
if _onem2m_notify == nil {
log.Error("init: _onem2m_notify is nil")
}
_sync_response = new(sync.WaitGroup) // Used to synchronize the response
_responses = make(map[uint16]mqtt.Message, 0)
broker_mqtt.running = true
log.Info("init: Client is connected")
return nil
}
func (broker_mqtt *SssMgrMqtt) uninit() (err error) {
log.Info(">>> uninit")
if !broker_mqtt.running {
err := errors.New("MQTT not initialized or diconnected")
log.Error(err.Error())
return err
}
token := broker_mqtt.client.Unsubscribe("/oneM2M")
if token.Error() != nil {
log.Error(token.Error())
// Continue
}
token.Wait()
broker_mqtt.client.Disconnect(250)
broker_mqtt.running = false
_sync_response = nil
_responses = nil
return nil
}
func (broker_mqtt *SssMgrMqtt) send(p_ctx SssMgrBindingProtocolContext) (err error, resp interface{}) {
log.Info(">>> send")
// Sanity checks
if !broker_mqtt.running {
err := errors.New("MQTT not initialized or diconnected")
log.Error(err.Error())
return err, nil
}
// Prepare the topic
broker_mqtt.sent_topic = "/oneM2M/req/" + "C" + p_ctx.from + "/" + p_ctx.name + "/json" // FIXME FSCOM Use parameter for type
// Complete payload dictionary
var body = map[string]interface{}{}
body["op"] = p_ctx.op
if err != nil {
log.Error(err.Error())
return err, nil
}
if p_ctx.ty != -1 {
body["ty"] = p_ctx.ty
}
body["fr"] = "C" + p_ctx.from
body["to"] = p_ctx.to
body["rqi"] = p_ctx.rqi
body["rvi"] = p_ctx.rvi[0]
if p_ctx.body != nil {
body["pc"] = p_ctx.body
}
if p_ctx.queries != nil && len(p_ctx.queries) != 0 {
d := make(map[string]int, 0)
for k, v := range p_ctx.queries {
if k == "ty" { // mosquitto_pub -d -q 0 -h 172.29.10.56 -p 1883 -t "/oneM2M/req/CAdmin/laboai-acme-ic-cse/json" -m "{\"fr\":\"CAdmin\",\"op\":2,\"rqi\":\"432bb877-7dc5-4e4d-b424-9c0d50604596\",\"rvi\":\"4\",\"to\":\"laboai-cse-in/YannouDomainAutomation/YannouGardenZone0\",\"ty\":3,\"fc\":{\"fu\":2,\"fo\":1}}"
body[k], err = strconv.Atoi(v)
if err != nil {
log.Error(err.Error())
return err, nil
}
continue
}
d[k], err = strconv.Atoi(v)
if err != nil {
log.Error(err.Error())
return err, nil
}
} // End of 'for' statement
d["fo"] = 1
body["fc"] = d
}
log.Debug("send: body=", body)
content, err := json.Marshal(body)
if err != nil {
log.Error("send: ", err.Error())
return err, nil
}
log.Debug("send: content: ", string(content))
log.Debug("send: topic: ", broker_mqtt.sent_topic)
token := broker_mqtt.client.Publish(broker_mqtt.sent_topic, 0, false, string(content))
token.Wait()
// Syncronization with the response
log.Debug("send: Start waiting for the response")
_sync_response.Add(1)
_sync_response.Wait()
if val, ok := _responses[token.(*mqtt.PublishToken).MessageID()]; ok {
delete(_responses, token.(*mqtt.PublishToken).MessageID())
log.Debug("send: Get the response: ", string(val.Payload()))
var d map[string]interface{}
err = json.Unmarshal(val.Payload(), &d)
if err != nil {
log.Error("send: ", err.Error())
return err, nil
}
if r, ok := d["pc"]; ok {
log.Debug("send: r: ", r)
log.Debug("send: TypeOf(r): ", reflect.TypeOf(r))
// var b []byte
// b, err = json.Marshal(r)
// if err != nil {
// log.Error("send: ", err.Error())
// return err, nil
// }
// log.Info("send: b: ", b)
// log.Info("send: TypeOf(b): ", reflect.TypeOf(b))
// err = json.Unmarshal(b, &resp)
// if err != nil {
// log.Error("send: ", err.Error())
// return err, nil
// }
return nil, r
}
return err, nil
} else {
log.Info("send: No response for messageID: ", token.(*mqtt.PublishToken).MessageID())
}
return nil, nil
}
func (broker_mqtt *SssMgrMqtt) notify(p_resp string) (err error) {
log.Info(">>> notify")
return nil
}
This diff is collapsed.
......@@ -18,7 +18,6 @@ package sssmgr
import (
"fmt"
"math/rand"
"reflect"
"testing"
......@@ -34,107 +33,202 @@ func TestNewSssMgr(t *testing.T) {
// Invalid Connector
fmt.Println("Invalid SSS Asset Manager")
tm, err := NewSssMgr("", tmNamespace, nil, nil, nil)
tm, err := NewSssMgr("", tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil)
if err == nil || tm != nil {
t.Fatalf("DB connection should have failed")
t.Fatalf("Service name not set")
}
tm, err = NewSssMgr(tmName, tmNamespace, "", "172.29.10.56", 1883, nil, nil, nil)
if err == nil || tm != nil {
t.Fatalf("Binding protocol not set")
}
tm, err = NewSssMgr(tmName, tmNamespace, "MQTT", "", 1883, nil, nil, nil)
if err == nil || tm != nil {
t.Fatalf("Binding protocol not set")
}
// Valid Connector
fmt.Println("Create valid SSS Asset Manager")
tm, err = NewSssMgr(tmName, tmNamespace, nil, nil, nil)
tm, err = NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil)
if err != nil || tm == nil {
t.Fatalf("Failed to create SSS Asset Manager")
}
// Cleanup
err = tm.DeleteSssMgr()
if err != nil {
t.Fatalf("Failed to cleanup SSS Asset Manager")
}
}
tm = nil
func TestPopulateDevicesPerIotPlatforms(t *testing.T) {
fmt.Println("--- ", t.Name())
log.MeepTextLogInit(t.Name())
// Valid Connector
fmt.Println("Create valid SSS Asset Manager")
tm, err := NewSssMgr(tmName, tmNamespace, nil, nil, nil)
tm, err = NewSssMgr(tmName, tmNamespace, "HTTP", "172.29.10.56", 1883, nil, nil, nil)
if err != nil || tm == nil {
t.Fatalf("Failed to create SSS Asset Manager")
}
err = tm.populateDevicesPerIotPlatforms()
if err != nil {
t.Fatalf(err.Error())
}
// Cleanup
err = tm.DeleteSssMgr()
if err != nil {
t.Fatalf("Failed to cleanup SSS Asset Manager")
}
tm = nil
}
func TestSensorDiscoveryInfoAll(t *testing.T) {
fmt.Println("--- ", t.Name())
log.MeepTextLogInit(t.Name())
// func TestPopulateDevicesPerIotPlatformsHttp(t *testing.T) {
// fmt.Println("--- ", t.Name())
// log.MeepTextLogInit(t.Name())
// // Valid Connector
// fmt.Println("Create valid SSS Asset Manager")
// tm, err := NewSssMgr(tmName, tmNamespace, "HTTP", "", 0, nil, nil, nil)
// if err != nil || tm == nil {
// t.Fatalf("Failed to create SSS Asset Manager")
// }
// Valid Connector
fmt.Println("Create valid SSS Asset Manager")
tm, err := NewSssMgr(tmName, tmNamespace, nil, nil, nil)
if err != nil || tm == nil {
t.Fatalf("Failed to create SSS Asset Manager")
}
// err = tm.populateDevicesPerIotPlatforms()
// if err != nil {
// t.Fatalf(err.Error())
// }
err = tm.populateDevicesPerIotPlatforms()
if err != nil {
t.Fatalf(err.Error())
}
// // Cleanup
// err = tm.DeleteSssMgr()
// if err != nil {
// t.Fatalf("Failed to cleanup SSS Asset Manager")
// }
// tm = nil
// }
sensors, err := tm.SensorDiscoveryInfoAll()
if err != nil {
t.Fatalf(err.Error())
}
fmt.Println("sensors: ", sensors)
// func TestSensorDiscoveryInfoAllHttp(t *testing.T) {
// fmt.Println("--- ", t.Name())
// log.MeepTextLogInit(t.Name())
// // Valid Connector
// fmt.Println("Create valid SSS Asset Manager")
// tm, err := NewSssMgr(tmName, tmNamespace, "HTTP", "", 0, nil, nil, nil)
// if err != nil || tm == nil {
// t.Fatalf("Failed to create SSS Asset Manager")
// }
// Cleanup
err = tm.DeleteSssMgr()
if err != nil {
t.Fatalf("Failed to cleanup SSS Asset Manager")
}
}
// sensors, err := tm.SensorDiscoveryInfoAll()
// if err != nil {
// t.Fatalf(err.Error())
// }
// fmt.Println("len=", len(sensors))
// fmt.Println("sensors", sensors)
// // Cleanup
// err = tm.DeleteSssMgr()
// if err != nil {
// t.Fatalf("Failed to cleanup SSS Asset Manager")
// }
// tm = nil
// }
// func TestGetSensorHttp(t *testing.T) {
// fmt.Println("--- ", t.Name())
// log.MeepTextLogInit(t.Name())
// // Valid Connector
// fmt.Println("Create valid SSS Asset Manager")
// tm, err := NewSssMgr(tmName, tmNamespace, "HTTP", "", 0, nil, nil, nil)
// if err != nil || tm == nil {
// t.Fatalf("Failed to create SSS Asset Manager")
// }
// sensors, err := tm.SensorDiscoveryInfoAll()
// if err != nil {
// t.Fatalf(err.Error())
// }
// for _, v := range sensors {
// fmt.Println("v", v)
// fmt.Println("TypeOf(v)", reflect.TypeOf(v))
// sensor, err := tm.GetSensor(v.SensorIdentifier)
// if !validate_sensor_discovery_info(v, sensor) {
// t.Fatalf(err.Error())
// }
// }
// // Cleanup
// err = tm.DeleteSssMgr()
// if err != nil {
// t.Fatalf("Failed to cleanup SSS Asset Manager")
// }
// tm = nil
// }
// func TestPopulateDevicesPerIotPlatformsMqtt(t *testing.T) {
// fmt.Println("--- ", t.Name())
// log.MeepTextLogInit(t.Name())
// // Valid Connector
// fmt.Println("Create valid SSS Asset Manager")
// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil)
// if err != nil || tm == nil {
// t.Fatalf("Failed to create SSS Asset Manager")
// }
// err = tm.populateDevicesPerIotPlatforms()
// if err != nil {
// t.Fatalf(err.Error())
// }
// // Cleanup
// err = tm.DeleteSssMgr()
// if err != nil {
// t.Fatalf("Failed to cleanup SSS Asset Manager")
// }
// tm = nil
// }
// func TestSensorDiscoveryInfoAllMqtt(t *testing.T) {
// fmt.Println("--- ", t.Name())
// log.MeepTextLogInit(t.Name())
// // Valid Connector
// fmt.Println("Create valid SSS Asset Manager")
// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil)
// if err != nil || tm == nil {
// t.Fatalf("Failed to create SSS Asset Manager")
// }
// err = tm.populateDevicesPerIotPlatforms()
// if err != nil {
// t.Fatalf(err.Error())
// }
// sensors, err := tm.SensorDiscoveryInfoAll()
// if err != nil {
// t.Fatalf(err.Error())
// }
// fmt.Println("sensors: ", sensors)
// // Cleanup
// err = tm.DeleteSssMgr()
// if err != nil {
// t.Fatalf("Failed to cleanup SSS Asset Manager")
// }
// }
func TestGetSensor(t *testing.T) {
func TestGetSensorMqtt(t *testing.T) {
fmt.Println("--- ", t.Name())
log.MeepTextLogInit(t.Name())
// Valid Connector
fmt.Println("Create valid SSS Asset Manager")
tm, err := NewSssMgr(tmName, tmNamespace, nil, nil, nil)
tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil)
if err != nil || tm == nil {
t.Fatalf("Failed to create SSS Asset Manager")
}
err = tm.populateDevicesPerIotPlatforms()
if err != nil {
t.Fatalf(err.Error())
}
sensors, err := tm.SensorDiscoveryInfoAll()
if err != nil {
t.Fatalf(err.Error())
}
fmt.Println("sensors: ", sensors)
idx := rand.Int31n(int32(len(sensors)))
sensor, err := tm.GetSensor(sensors[idx].SensorIdentifier)
if err != nil {
t.Fatalf(err.Error())
}
fmt.Println("sensor: ", sensor)
if !validate_sensor_discovery_info(sensors[idx], sensor) {
t.Fatalf("Value mismatch")
for _, v := range sensors {
fmt.Println("v", v)
fmt.Println("TypeOf(v)", reflect.TypeOf(v))
sensor, err := tm.GetSensor(v.SensorIdentifier)
if !validate_sensor_discovery_info(v, sensor) {
t.Fatalf(err.Error())
}
}
// Cleanup
......@@ -142,15 +236,76 @@ func TestGetSensor(t *testing.T) {
if err != nil {
t.Fatalf("Failed to cleanup SSS Asset Manager")
}
tm = nil
}
// func TestVaidateOneM2MNotificationServer(t *testing.T) {
// fmt.Println("--- ", t.Name())
// log.MeepTextLogInit(t.Name())
// // Valid Connector
// fmt.Println("Create valid SSS Asset Manager")
// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, "MQTT", "172.29.10.56", 1883, nil, nil, nil)
// if err != nil || tm == nil {
// t.Fatalf("Failed to create SSS Asset Manager")
// }
// tm.init()
// fmt.Println("Waiting for 2 minutes to do curl request: curl -v http://mec-platform.etsi.org:33122/sbxykqjr17/mep1/sens/v1 ")
// // Cleanup
// err = tm.DeleteSssMgr()
// if err != nil {
// t.Fatalf("Failed to cleanup SSS Asset Manager")
// }
// }
// func TestGetSensor(t *testing.T) {
// fmt.Println("--- ", t.Name())
// log.MeepTextLogInit(t.Name())
// // Valid Connector
// fmt.Println("Create valid SSS Asset Manager")
// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil)
// if err != nil || tm == nil {
// t.Fatalf("Failed to create SSS Asset Manager")
// }
// err = tm.populateDevicesPerIotPlatforms()
// if err != nil {
// t.Fatalf(err.Error())
// }
// sensors, err := tm.SensorDiscoveryInfoAll()
// if err != nil {
// t.Fatalf(err.Error())
// }
// fmt.Println("sensors: ", sensors)
// idx := rand.Int31n(int32(len(sensors)))
// sensor, err := tm.GetSensor(sensors[idx].SensorIdentifier)
// if err != nil {
// t.Fatalf(err.Error())
// }
// fmt.Println("sensor: ", sensor)
// if !validate_sensor_discovery_info(sensors[idx], sensor) {
// t.Fatalf("Value mismatch")
// }
// // Cleanup
// err = tm.DeleteSssMgr()
// if err != nil {
// t.Fatalf("Failed to cleanup SSS Asset Manager")
// }
// }
// func TestOneM2mCreateAEAndCNT(t *testing.T) {
// fmt.Println("--- ", t.Name())
// log.MeepTextLogInit(t.Name())
// // Valid Connector
// fmt.Println("Create valid SSS Asset Manager")
// tm, err := NewSssMgr(tmName, tmNamespace, nil, nil, nil)
// tm, err := NewSssMgr(tmName, tmNamespace, "MQTT", "172.29.10.56", 1883, nil, nil, nil)
// if err != nil || tm == nil {
// t.Fatalf("Failed to create SSS Asset Manager")
// }
......
This diff is collapsed.
......@@ -161,9 +161,9 @@ func (broker_mqtt *message_broker_mqtt) Send(tm *TrafficMgr, msgContent string,
log.Error(err.Error())
return err
}
log.Info("message_broker_simu: Send: Publish content : ", content)
log.Info("message_broker_simu: Send: msgEncodeFormat: ", msgEncodeFormat)
log.Info("message_broker_simu: Send: stdOrganization: ", stdOrganization)
log.Info("message_broker_mqtt: Send: Publish content : ", content)
log.Info("message_broker_mqtt: Send: msgEncodeFormat: ", msgEncodeFormat)
log.Info("message_broker_mqtt: Send: stdOrganization: ", stdOrganization)
token := broker_mqtt.client.Publish(tm.topic, 0, false, content)
token.Wait()
......