Skip to content
mqtt.go 7.3 KiB
Newer Older
Yann Garcia's avatar
Yann Garcia committed
package sssmgr

import (
	"encoding/json"
	"errors"
	"fmt"
	"reflect"
	"strconv"
	"sync"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	mqtt "github.com/eclipse/paho.mqtt.golang"
)

type SssMgrMqtt struct {
	running       bool
	opts          *mqtt.ClientOptions
	client        mqtt.Client
	sent_topic    string
	onem2m_notify func(p_topic string, p_payload []byte)
}

var _onem2m_notify func(p_topic string, p_payload []byte)

var _sync_response *sync.WaitGroup = nil // Used to synchronize the response
var _responses map[uint16]mqtt.Message

func onMessageReceived(client mqtt.Client, msg mqtt.Message) {
	go func() {
		log.Info("onMessageReceived: Received message: ", msg.Payload(), " on topic ", msg.Topic())
		// if _onem2m_notify != nil {
		// 	_onem2m_notify(msg.Topic(), msg.Payload())
		// } else {
		// 	log.Info("onMessageReceived: null pointer for the callbacl")
		// }
	}()
}

func onMessageReceivedReq(client mqtt.Client, msg mqtt.Message) {
	go func() {
		log.Info("onMessageReceivedReq: Received message: ", string(msg.Payload()), " on topic ", msg.Topic())
		if _onem2m_notify != nil {
			_onem2m_notify(msg.Topic(), msg.Payload())
		} else {
			log.Info("onMessageReceivedReq: null pointer for the callbacl")
		}
	}()
}

func onMessageReceivedResp(client mqtt.Client, msg mqtt.Message) {
	go func() {
		log.Info("onMessageReceivedResp: Received message: ", string(msg.Payload()), " on topic ", msg.Topic())
		defer _sync_response.Done()
		_responses[msg.MessageID()] = msg
	}()
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
	log.Info("mqtt.OnConnectHandler: Connected")
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
	log.Info("Connect lost:", err)
}

func NewSssMgrMqtt() (broker_mqtt *SssMgrMqtt) {
	log.Info(">>> NewSssMgrMqtt")
	return new(SssMgrMqtt)
}

func (broker_mqtt *SssMgrMqtt) init(tm *SssMgr) (err error) {
	log.Info(">>> init")

	broker_mqtt.running = false

	broker_mqtt.opts = mqtt.NewClientOptions()
	broker_mqtt.opts.SetDefaultPublishHandler(onMessageReceived)
	broker_mqtt.opts.SetClientID("AdvantEDGE.meep-vis-traffic-mgr")
	broker_mqtt.opts.OnConnect = connectHandler
	broker_mqtt.opts.OnConnectionLost = connectLostHandler
	broker_mqtt.opts.CleanSession = true
	broker_mqtt.opts.SetUsername("")
	broker_mqtt.opts.SetPassword("")
	log.Info("init: Add brocker: ", fmt.Sprintf("tcp://%s:%d", tm.host, tm.port))
	broker_mqtt.opts.AddBroker(fmt.Sprintf("tcp://%s:%d", tm.host, tm.port))
	broker_mqtt.client = mqtt.NewClient(broker_mqtt.opts)

	log.Info("init: Connect to MQTT server...")
	token := broker_mqtt.client.Connect()
	if token.Error() != nil {
		log.Error(token.Error())
		return token.Error()
	}
	token.Wait()

	// Subscribe
	log.Info("init: Subscribe to: ", "oneM2M/req/+")
	token = broker_mqtt.client.Subscribe("oneM2M/req/+", 0, onMessageReceivedReq) // qos:0
	if token.Error() != nil {
		log.Error(token.Error())
		return token.Error()
	}
	// token.Wait()
	log.Info("init: Subscribe to: ", "/oneM2M/resp/#")
	token = broker_mqtt.client.Subscribe("/oneM2M/resp/#", 0, onMessageReceivedResp) // qos:0
	if token.Error() != nil {
		log.Error(token.Error())
		return token.Error()
	}
	token.Wait()

	_onem2m_notify = broker_mqtt.onem2m_notify
	if _onem2m_notify == nil {
		log.Error("init: _onem2m_notify is nil")
	}

	_sync_response = new(sync.WaitGroup) // Used to synchronize the response
	_responses = make(map[uint16]mqtt.Message, 0)

	broker_mqtt.running = true
	log.Info("init: Client is connected")

	return nil
}

func (broker_mqtt *SssMgrMqtt) uninit() (err error) {
	log.Info(">>> uninit")

	if !broker_mqtt.running {
		err := errors.New("MQTT not initialized or diconnected")
		log.Error(err.Error())
		return err
	}

	token := broker_mqtt.client.Unsubscribe("/oneM2M")
	if token.Error() != nil {
		log.Error(token.Error())
		// Continue
	}
	token.Wait()
	broker_mqtt.client.Disconnect(250)
	broker_mqtt.running = false

	_sync_response = nil
	_responses = nil

	return nil
}

func (broker_mqtt *SssMgrMqtt) send(p_ctx SssMgrBindingProtocolContext) (err error, resp interface{}) {
	log.Info(">>> send")

	// Sanity checks
	if !broker_mqtt.running {
		err := errors.New("MQTT not initialized or diconnected")
		log.Error(err.Error())
		return err, nil
	}

	// Prepare the topic
	broker_mqtt.sent_topic = "/oneM2M/req/" + "C" + p_ctx.from + "/" + p_ctx.name + "/json" // FIXME FSCOM Use parameter for type
	// Complete payload dictionary
	var body = map[string]interface{}{}
	body["op"] = p_ctx.op
	if err != nil {
		log.Error(err.Error())
		return err, nil
	}
	if p_ctx.ty != -1 {
		body["ty"] = p_ctx.ty
	}
	body["fr"] = "C" + p_ctx.from
	body["to"] = p_ctx.to
	body["rqi"] = p_ctx.rqi
	body["rvi"] = p_ctx.rvi[0]
	if p_ctx.body != nil {
		body["pc"] = p_ctx.body
	}
	if p_ctx.queries != nil && len(p_ctx.queries) != 0 {
		d := make(map[string]int, 0)
		for k, v := range p_ctx.queries {
			if k == "ty" { // mosquitto_pub -d -q 0 -h 172.29.10.56 -p 1883 -t "/oneM2M/req/CAdmin/laboai-acme-ic-cse/json" -m "{\"fr\":\"CAdmin\",\"op\":2,\"rqi\":\"432bb877-7dc5-4e4d-b424-9c0d50604596\",\"rvi\":\"4\",\"to\":\"laboai-cse-in/YannouDomainAutomation/YannouGardenZone0\",\"ty\":3,\"fc\":{\"fu\":2,\"fo\":1}}"
				//body[k] = v
				i, err := strconv.Atoi(v) // body[k], err := strconv.Atoi(v)
				body[k] = i               // + 1
				/**
				 * FIXME FSCOM Based on ACME, there is a different behavior between MQTT & HTTP DISCOVERY to get the list of contaimer.
				 * For HTTP, ty = 3 (AE !?)
				 * FOR MQTT, ty = 4 (CNT)
				 * This is the reason of the 'body[k] = i + 1' for MQTT and not for HTTP
				 */
				if err != nil {
					log.Error(err.Error())
					return err, nil
				}
				continue
			}
			d[k], err = strconv.Atoi(v)
			if err != nil {
				log.Error(err.Error())
				return err, nil
			}
		} // End of 'for' statement
		d["fo"] = 1
		body["fc"] = d
	}
	log.Debug("send: body=", body)

	content, err := json.Marshal(body)
	if err != nil {
		log.Error("send: ", err.Error())
		return err, nil
	}
	log.Debug("send: content: ", string(content))
	log.Debug("send: topic: ", broker_mqtt.sent_topic)

	token := broker_mqtt.client.Publish(broker_mqtt.sent_topic, 0, false, string(content))
	token.Wait()

	// Syncronization with the response
	log.Debug("send: Start waiting for the response")
	_sync_response.Add(1)
	_sync_response.Wait()
	if val, ok := _responses[token.(*mqtt.PublishToken).MessageID()]; ok {
		delete(_responses, token.(*mqtt.PublishToken).MessageID())
		log.Debug("send: Get the response: ", string(val.Payload()))
		var d map[string]interface{}
		err = json.Unmarshal(val.Payload(), &d)
		if err != nil {
			log.Error("send: ", err.Error())
			return err, nil
		}
		if r, ok := d["pc"]; ok {
			log.Debug("send: r: ", r)
			log.Debug("send: TypeOf(r): ", reflect.TypeOf(r))
			// var b []byte
			// b, err = json.Marshal(r)
			// if err != nil {
			// 	log.Error("send: ", err.Error())
			// 	return err, nil
			// }
			// log.Info("send: b: ", b)
			// log.Info("send: TypeOf(b): ", reflect.TypeOf(b))
			// err = json.Unmarshal(b, &resp)
			// if err != nil {
			// 	log.Error("send: ", err.Error())
			// 	return err, nil
			// }
			return nil, r
		}
		return err, nil
	} else {
		log.Info("send: No response for messageID: ", token.(*mqtt.PublishToken).MessageID())
	}

	return nil, nil
}

func (broker_mqtt *SssMgrMqtt) notify(p_resp string) (err error) {
	log.Info(">>> notify")

	return nil
}