Commit 648fbb1f authored by Yann Garcia's avatar Yann Garcia
Browse files

Add oneM2M MQTT support

parent c8c7efe1
Loading
Loading
Loading
Loading
+29 −0
Original line number Diff line number Diff line
package sssmgr

type SssMgrBindingProtocol interface {
	init(tm *SssMgr) (err error)
	send(p_ctx SssMgrBindingProtocolContext) (err error, resp interface{})
	//(p_op string, p_type string, p_to string, p_from string, p_ri string, p_rev []string, p_bodyMap map[string]map[string]interface{}) (err error, resp map[string]map[string]interface{})
	notify(p_resp string) (err error)
	uninit() (err error)
}

type SssMgrBindingProtocolContext struct {
	host    string
	port    int
	name    string
	to      string
	from    string
	op      int
	ty      int
	rqi     string
	rvi     []string
	queries map[string]string
	body    map[string]map[string]interface{}
	code    int
}

/**
TS-0001 V4.22 Table 9.6.2.3-1: Types of parameters in accessControlOperations
TS-0004 V4.22 Table 6.3.4.2.5 1: Interpretation of m2m:operation
**/
+184 −0
Original line number Diff line number Diff line
package sssmgr

import (
	"bytes"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"io/ioutil"
	"net/http"
	"net/url"
	"reflect"
	"strconv"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	"github.com/gorilla/mux"
)

type SssMgrHttp struct {
}

const (
	headerAccept      = "application/json"
	headerContentType = "application/json"
)

func NewSssMgrHttp() (http_mgr *SssMgrHttp) {
	log.Info(">>> NewSssMgrHttp")
	return new(SssMgrHttp)
}

func (http_mgr *SssMgrHttp) init(tm *SssMgr) (err error) {
	log.Info(">>> init")

	// log.Info("Init: Starting OneM2M Notification server")
	// go func() {
	// 	http.HandleFunc("/", tm.handleRoot)
	// 	err := http.ListenAndServe(":33122", nil)
	// 	if err != nil {
	// 		log.Error(err.Error())
	// 		return
	// 	}
	// 	log.Info("<<< Init: Terminating OneM2M Notification server")
	// }()

	return nil
}

func (http_mgr *SssMgrHttp) handleRoot(w http.ResponseWriter, r *http.Request) {
	log.Debug(">>> handleRoot: ", r)

	w.Header().Set("Content-Type", "application/json; charset=UTF-8")

	// get & validate query param values for sensorIdentifier
	u, _ := url.Parse(r.URL.String())
	log.Info("url: ", u.RequestURI())

	w.WriteHeader(http.StatusOK)
}

func (http_mgr *SssMgrHttp) uninit() (err error) {
	log.Info(">>> uninit")
	return nil
}

func (http_mgr *SssMgrHttp) send(p_ctx SssMgrBindingProtocolContext) (err error, resp interface{}) {
	log.Info(">>> send: ", p_ctx)

	// Build the headers
	var headers = http.Header{}
	headers["Accept"] = []string{headerAccept}
	headers["X-M2M-Origin"] = []string{"C" + p_ctx.from}
	headers["X-M2M-RI"] = []string{p_ctx.rqi}
	headers["X-M2M-RVI"] = p_ctx.rvi
	s := headerContentType
	if p_ctx.ty != -1 {
		s = s + ";ty=" + strconv.Itoa(p_ctx.ty)
	}
	headers["Content-Type"] = []string{s}
	// Build the url
	url := "http://" + p_ctx.host + ":" + strconv.Itoa(p_ctx.port) + "/" + p_ctx.to
	// Set the method
	method := ""
	if p_ctx.op == 1 {
		method = "POST"
	} else if p_ctx.op == 2 {
		method = "GET"
	} else if p_ctx.op == 3 {
		method = "PATCH"
	} else if p_ctx.op == 4 {
		method = "DELETE"
	} else {
		err := errors.New("Invalid p_ctx.op")
		log.Error("send: ", err.Error())
		return err, nil
	}
	// Finalize the body
	log.Debug("send: url=", url)
	if p_ctx.body != nil { // With body
		log.Debug("send: p_ctx.body=", p_ctx.body)
		body, err := json.Marshal(p_ctx.body)
		if err != nil {
			log.Error("send: ", err.Error())
			return err, nil
		}
		log.Debug("send: Request body: ", string(body))
		response, err := sendRequest(method, url, headers, bytes.NewBuffer(body), nil, p_ctx.queries, 201)
		if err != nil {
			log.Error("send: ", err.Error())
			return err, nil
		}
		log.Debug("send: response: ", string(response))
		err = json.Unmarshal(response, &resp)
		if err != nil {
			log.Error("send: ", err.Error())
			return err, nil
		}
		log.Debug("send: response: ", resp)
		log.Debug("send: TypeOf(response): ", reflect.TypeOf(resp))
	} else { // Without body
		response, err := sendRequest(method, url, headers, nil, nil, p_ctx.queries, p_ctx.code)
		if err != nil {
			log.Error("send: ", err.Error())
			return err, nil
		}
		log.Debug("send: response: ", string(response))
		err = json.Unmarshal(response, &resp)
		if err != nil {
			log.Error("send: ", err.Error())
			return err, nil
		}
		log.Debug("send: response: ", resp)
		log.Debug("send: TypeOf(response): ", reflect.TypeOf(resp))
	}

	return nil, resp
}

func (http_mgr *SssMgrHttp) notify(p_resp string) (err error) {
	log.Info(">>> notify")
	return nil
}

func sendRequest(method string, url string, headers http.Header, body io.Reader, vars map[string]string, query map[string]string, code int) ([]byte, error) {
	//log.Debug(">>> sendRequest: url: ", url)
	//log.Debug(">>> sendRequest: headers: ", headers)

	req, err := http.NewRequest(method, url, body)
	if err != nil || req == nil {
		return nil, err
	}
	if vars != nil {
		req = mux.SetURLVars(req, vars)
	}
	if query != nil {
		q := req.URL.Query()
		for k, v := range query {
			q.Add(k, v)
		}
		req.URL.RawQuery = q.Encode()
	}
	req.Header = headers
	req.Close = true

	//log.Debug("sendRequest: req: ", req)
	rr, err := http.DefaultClient.Do(req)
	if err != nil {
		return nil, err
	}

	// Check the status code is what we expect.
	//log.Debug("sendRequest: rr: ", rr)
	if status := rr.StatusCode; status != code {
		s := fmt.Sprintf("Wrong status code - got %v want %v", status, code)
		return nil, errors.New(s)
	}
	responseData, err := ioutil.ReadAll(rr.Body)
	if err != nil {
		return nil, err
	}
	//log.Debug("sendRequest: responseData: ", responseData)

	return responseData, nil
}
+263 −0
Original line number Diff line number Diff line
package sssmgr

import (
	"encoding/json"
	"errors"
	"fmt"
	"reflect"
	"strconv"
	"sync"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	mqtt "github.com/eclipse/paho.mqtt.golang"
)

type SssMgrMqtt struct {
	running       bool
	opts          *mqtt.ClientOptions
	client        mqtt.Client
	sent_topic    string
	onem2m_notify func(p_topic string, p_payload []byte)
}

var _onem2m_notify func(p_topic string, p_payload []byte)

var _sync_response *sync.WaitGroup = nil // Used to synchronize the response
var _responses map[uint16]mqtt.Message

func onMessageReceived(client mqtt.Client, msg mqtt.Message) {
	go func() {
		log.Info("onMessageReceived: Received message: ", msg.Payload(), " on topic ", msg.Topic())
		// if _onem2m_notify != nil {
		// 	_onem2m_notify(msg.Topic(), msg.Payload())
		// } else {
		// 	log.Info("onMessageReceived: null pointer for the callbacl")
		// }
	}()
}

func onMessageReceivedReq(client mqtt.Client, msg mqtt.Message) {
	go func() {
		log.Info("onMessageReceivedReq: Received message: ", string(msg.Payload()), " on topic ", msg.Topic())
		if _onem2m_notify != nil {
			_onem2m_notify(msg.Topic(), msg.Payload())
		} else {
			log.Info("onMessageReceivedReq: null pointer for the callbacl")
		}
	}()
}

func onMessageReceivedResp(client mqtt.Client, msg mqtt.Message) {
	go func() {
		log.Info("onMessageReceivedResp: Received message: ", string(msg.Payload()), " on topic ", msg.Topic())
		defer _sync_response.Done()
		_responses[msg.MessageID()] = msg
	}()
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
	log.Info("mqtt.OnConnectHandler: Connected")
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
	log.Info("Connect lost:", err)
}

func NewSssMgrMqtt() (broker_mqtt *SssMgrMqtt) {
	log.Info(">>> NewSssMgrMqtt")
	return new(SssMgrMqtt)
}

func (broker_mqtt *SssMgrMqtt) init(tm *SssMgr) (err error) {
	log.Info(">>> init")

	broker_mqtt.running = false

	broker_mqtt.opts = mqtt.NewClientOptions()
	broker_mqtt.opts.SetDefaultPublishHandler(onMessageReceived)
	broker_mqtt.opts.SetClientID("AdvantEDGE.meep-vis-traffic-mgr")
	broker_mqtt.opts.OnConnect = connectHandler
	broker_mqtt.opts.OnConnectionLost = connectLostHandler
	broker_mqtt.opts.CleanSession = true
	broker_mqtt.opts.SetUsername("")
	broker_mqtt.opts.SetPassword("")
	log.Info("init: Add brocker: ", fmt.Sprintf("tcp://%s:%d", tm.host, tm.port))
	broker_mqtt.opts.AddBroker(fmt.Sprintf("tcp://%s:%d", tm.host, tm.port))
	broker_mqtt.client = mqtt.NewClient(broker_mqtt.opts)

	log.Info("init: Connect to MQTT server...")
	token := broker_mqtt.client.Connect()
	if token.Error() != nil {
		log.Error(token.Error())
		return token.Error()
	}
	token.Wait()

	// Subscribe
	log.Info("init: Subscribe to: ", "oneM2M/req/+")
	token = broker_mqtt.client.Subscribe("oneM2M/req/+", 0, onMessageReceivedReq) // qos:0
	if token.Error() != nil {
		log.Error(token.Error())
		return token.Error()
	}
	// token.Wait()
	log.Info("init: Subscribe to: ", "/oneM2M/resp/#")
	token = broker_mqtt.client.Subscribe("/oneM2M/resp/#", 0, onMessageReceivedResp) // qos:0
	if token.Error() != nil {
		log.Error(token.Error())
		return token.Error()
	}
	token.Wait()

	_onem2m_notify = broker_mqtt.onem2m_notify
	if _onem2m_notify == nil {
		log.Error("init: _onem2m_notify is nil")
	}

	_sync_response = new(sync.WaitGroup) // Used to synchronize the response
	_responses = make(map[uint16]mqtt.Message, 0)

	broker_mqtt.running = true
	log.Info("init: Client is connected")

	return nil
}

func (broker_mqtt *SssMgrMqtt) uninit() (err error) {
	log.Info(">>> uninit")

	if !broker_mqtt.running {
		err := errors.New("MQTT not initialized or diconnected")
		log.Error(err.Error())
		return err
	}

	token := broker_mqtt.client.Unsubscribe("/oneM2M")
	if token.Error() != nil {
		log.Error(token.Error())
		// Continue
	}
	token.Wait()
	broker_mqtt.client.Disconnect(250)
	broker_mqtt.running = false

	_sync_response = nil
	_responses = nil

	return nil
}

func (broker_mqtt *SssMgrMqtt) send(p_ctx SssMgrBindingProtocolContext) (err error, resp interface{}) {
	log.Info(">>> send")

	// Sanity checks
	if !broker_mqtt.running {
		err := errors.New("MQTT not initialized or diconnected")
		log.Error(err.Error())
		return err, nil
	}

	// Prepare the topic
	broker_mqtt.sent_topic = "/oneM2M/req/" + "C" + p_ctx.from + "/" + p_ctx.name + "/json" // FIXME FSCOM Use parameter for type
	// Complete payload dictionary
	var body = map[string]interface{}{}
	body["op"] = p_ctx.op
	if err != nil {
		log.Error(err.Error())
		return err, nil
	}
	if p_ctx.ty != -1 {
		body["ty"] = p_ctx.ty
	}
	body["fr"] = "C" + p_ctx.from
	body["to"] = p_ctx.to
	body["rqi"] = p_ctx.rqi
	body["rvi"] = p_ctx.rvi[0]
	if p_ctx.body != nil {
		body["pc"] = p_ctx.body
	}
	if p_ctx.queries != nil && len(p_ctx.queries) != 0 {
		d := make(map[string]int, 0)
		for k, v := range p_ctx.queries {
			if k == "ty" { // mosquitto_pub -d -q 0 -h 172.29.10.56 -p 1883 -t "/oneM2M/req/CAdmin/laboai-acme-ic-cse/json" -m "{\"fr\":\"CAdmin\",\"op\":2,\"rqi\":\"432bb877-7dc5-4e4d-b424-9c0d50604596\",\"rvi\":\"4\",\"to\":\"laboai-cse-in/YannouDomainAutomation/YannouGardenZone0\",\"ty\":3,\"fc\":{\"fu\":2,\"fo\":1}}"
				//body[k] = v
				i, err := strconv.Atoi(v) // body[k], err := strconv.Atoi(v)
				body[k] = i               // + 1
				/**
				 * FIXME FSCOM Based on ACME, there is a different behavior between MQTT & HTTP DISCOVERY to get the list of contaimer.
				 * For HTTP, ty = 3 (AE !?)
				 * FOR MQTT, ty = 4 (CNT)
				 * This is the reason of the 'body[k] = i + 1' for MQTT and not for HTTP
				 */
				if err != nil {
					log.Error(err.Error())
					return err, nil
				}
				continue
			}
			d[k], err = strconv.Atoi(v)
			if err != nil {
				log.Error(err.Error())
				return err, nil
			}
		} // End of 'for' statement
		d["fo"] = 1
		body["fc"] = d
	}
	log.Debug("send: body=", body)

	content, err := json.Marshal(body)
	if err != nil {
		log.Error("send: ", err.Error())
		return err, nil
	}
	log.Debug("send: content: ", string(content))
	log.Debug("send: topic: ", broker_mqtt.sent_topic)

	token := broker_mqtt.client.Publish(broker_mqtt.sent_topic, 0, false, string(content))
	token.Wait()

	// Syncronization with the response
	log.Debug("send: Start waiting for the response")
	_sync_response.Add(1)
	_sync_response.Wait()
	if val, ok := _responses[token.(*mqtt.PublishToken).MessageID()]; ok {
		delete(_responses, token.(*mqtt.PublishToken).MessageID())
		log.Debug("send: Get the response: ", string(val.Payload()))
		var d map[string]interface{}
		err = json.Unmarshal(val.Payload(), &d)
		if err != nil {
			log.Error("send: ", err.Error())
			return err, nil
		}
		if r, ok := d["pc"]; ok {
			log.Debug("send: r: ", r)
			log.Debug("send: TypeOf(r): ", reflect.TypeOf(r))
			// var b []byte
			// b, err = json.Marshal(r)
			// if err != nil {
			// 	log.Error("send: ", err.Error())
			// 	return err, nil
			// }
			// log.Info("send: b: ", b)
			// log.Info("send: TypeOf(b): ", reflect.TypeOf(b))
			// err = json.Unmarshal(b, &resp)
			// if err != nil {
			// 	log.Error("send: ", err.Error())
			// 	return err, nil
			// }
			return nil, r
		}
		return err, nil
	} else {
		log.Info("send: No response for messageID: ", token.(*mqtt.PublishToken).MessageID())
	}

	return nil, nil
}

func (broker_mqtt *SssMgrMqtt) notify(p_resp string) (err error) {
	log.Info(">>> notify")

	return nil
}
+724 −0

File added.

Preview size limit exceeded, changes collapsed.