package sssmgr import ( "encoding/json" "errors" "fmt" "reflect" "strconv" "sync" log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger" mqtt "github.com/eclipse/paho.mqtt.golang" ) type SssMgrMqtt struct { running bool opts *mqtt.ClientOptions client mqtt.Client sent_topic string onem2m_notify func(p_topic string, p_payload []byte) } var _onem2m_notify func(p_topic string, p_payload []byte) var _sync_response *sync.WaitGroup = nil // Used to synchronize the response var _responses map[uint16]mqtt.Message func onMessageReceived(client mqtt.Client, msg mqtt.Message) { go func() { log.Info("onMessageReceived: Received message: ", msg.Payload(), " on topic ", msg.Topic()) // if _onem2m_notify != nil { // _onem2m_notify(msg.Topic(), msg.Payload()) // } else { // log.Info("onMessageReceived: null pointer for the callbacl") // } }() } func onMessageReceivedReq(client mqtt.Client, msg mqtt.Message) { go func() { log.Info("onMessageReceivedReq: Received message: ", string(msg.Payload()), " on topic ", msg.Topic()) if _onem2m_notify != nil { _onem2m_notify(msg.Topic(), msg.Payload()) } else { log.Info("onMessageReceivedReq: null pointer for the callbacl") } }() } func onMessageReceivedResp(client mqtt.Client, msg mqtt.Message) { go func() { log.Info("onMessageReceivedResp: Received message: ", string(msg.Payload()), " on topic ", msg.Topic()) defer _sync_response.Done() _responses[msg.MessageID()] = msg }() } var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) { log.Info("mqtt.OnConnectHandler: Connected") } var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) { log.Info("Connect lost:", err) } func NewSssMgrMqtt() (broker_mqtt *SssMgrMqtt) { log.Info(">>> NewSssMgrMqtt") return new(SssMgrMqtt) } func (broker_mqtt *SssMgrMqtt) init(tm *SssMgr) (err error) { log.Info(">>> init") broker_mqtt.running = false broker_mqtt.opts = mqtt.NewClientOptions() broker_mqtt.opts.SetDefaultPublishHandler(onMessageReceived) broker_mqtt.opts.SetClientID("AdvantEDGE.meep-vis-traffic-mgr") broker_mqtt.opts.OnConnect = connectHandler broker_mqtt.opts.OnConnectionLost = connectLostHandler broker_mqtt.opts.CleanSession = true broker_mqtt.opts.SetUsername("") broker_mqtt.opts.SetPassword("") log.Info("init: Add brocker: ", fmt.Sprintf("tcp://%s:%d", tm.host, tm.port)) broker_mqtt.opts.AddBroker(fmt.Sprintf("tcp://%s:%d", tm.host, tm.port)) broker_mqtt.client = mqtt.NewClient(broker_mqtt.opts) log.Info("init: Connect to MQTT server...") token := broker_mqtt.client.Connect() if token.Error() != nil { log.Error(token.Error()) return token.Error() } token.Wait() // Subscribe log.Info("init: Subscribe to: ", "oneM2M/req/+") token = broker_mqtt.client.Subscribe("oneM2M/req/+", 0, onMessageReceivedReq) // qos:0 if token.Error() != nil { log.Error(token.Error()) return token.Error() } // token.Wait() log.Info("init: Subscribe to: ", "/oneM2M/resp/#") token = broker_mqtt.client.Subscribe("/oneM2M/resp/#", 0, onMessageReceivedResp) // qos:0 if token.Error() != nil { log.Error(token.Error()) return token.Error() } token.Wait() _onem2m_notify = broker_mqtt.onem2m_notify if _onem2m_notify == nil { log.Error("init: _onem2m_notify is nil") } _sync_response = new(sync.WaitGroup) // Used to synchronize the response _responses = make(map[uint16]mqtt.Message, 0) broker_mqtt.running = true log.Info("init: Client is connected") return nil } func (broker_mqtt *SssMgrMqtt) uninit() (err error) { log.Info(">>> uninit") if !broker_mqtt.running { err := errors.New("MQTT not initialized or diconnected") log.Error(err.Error()) return err } token := broker_mqtt.client.Unsubscribe("/oneM2M") if token.Error() != nil { log.Error(token.Error()) // Continue } token.Wait() broker_mqtt.client.Disconnect(250) broker_mqtt.running = false _sync_response = nil _responses = nil return nil } func (broker_mqtt *SssMgrMqtt) send(p_ctx SssMgrBindingProtocolContext) (err error, resp interface{}) { log.Info(">>> send") // Sanity checks if !broker_mqtt.running { err := errors.New("MQTT not initialized or diconnected") log.Error(err.Error()) return err, nil } // Prepare the topic broker_mqtt.sent_topic = "/oneM2M/req/" + "C" + p_ctx.from + "/" + p_ctx.name + "/json" // FIXME FSCOM Use parameter for type // Complete payload dictionary var body = map[string]interface{}{} body["op"] = p_ctx.op if err != nil { log.Error(err.Error()) return err, nil } if p_ctx.ty != -1 { body["ty"] = p_ctx.ty } body["fr"] = "C" + p_ctx.from body["to"] = p_ctx.to body["rqi"] = p_ctx.rqi body["rvi"] = p_ctx.rvi[0] if p_ctx.body != nil { body["pc"] = p_ctx.body } if p_ctx.queries != nil && len(p_ctx.queries) != 0 { d := make(map[string]int, 0) for k, v := range p_ctx.queries { if k == "ty" { // mosquitto_pub -d -q 0 -h 172.29.10.56 -p 1883 -t "/oneM2M/req/CAdmin/laboai-acme-ic-cse/json" -m "{\"fr\":\"CAdmin\",\"op\":2,\"rqi\":\"432bb877-7dc5-4e4d-b424-9c0d50604596\",\"rvi\":\"4\",\"to\":\"laboai-cse-in/YannouDomainAutomation/YannouGardenZone0\",\"ty\":3,\"fc\":{\"fu\":2,\"fo\":1}}" body[k], err = strconv.Atoi(v) if err != nil { log.Error(err.Error()) return err, nil } continue } d[k], err = strconv.Atoi(v) if err != nil { log.Error(err.Error()) return err, nil } } // End of 'for' statement d["fo"] = 1 body["fc"] = d } log.Debug("send: body=", body) content, err := json.Marshal(body) if err != nil { log.Error("send: ", err.Error()) return err, nil } log.Debug("send: content: ", string(content)) log.Debug("send: topic: ", broker_mqtt.sent_topic) token := broker_mqtt.client.Publish(broker_mqtt.sent_topic, 0, false, string(content)) token.Wait() // Syncronization with the response log.Debug("send: Start waiting for the response") _sync_response.Add(1) _sync_response.Wait() if val, ok := _responses[token.(*mqtt.PublishToken).MessageID()]; ok { delete(_responses, token.(*mqtt.PublishToken).MessageID()) log.Debug("send: Get the response: ", string(val.Payload())) var d map[string]interface{} err = json.Unmarshal(val.Payload(), &d) if err != nil { log.Error("send: ", err.Error()) return err, nil } if r, ok := d["pc"]; ok { log.Debug("send: r: ", r) log.Debug("send: TypeOf(r): ", reflect.TypeOf(r)) // var b []byte // b, err = json.Marshal(r) // if err != nil { // log.Error("send: ", err.Error()) // return err, nil // } // log.Info("send: b: ", b) // log.Info("send: TypeOf(b): ", reflect.TypeOf(b)) // err = json.Unmarshal(b, &resp) // if err != nil { // log.Error("send: ", err.Error()) // return err, nil // } return nil, r } return err, nil } else { log.Info("send: No response for messageID: ", token.(*mqtt.PublishToken).MessageID()) } return nil, nil } func (broker_mqtt *SssMgrMqtt) notify(p_resp string) (err error) { log.Info(">>> notify") return nil }