Skip to content
mqtt.go 4.93 KiB
Newer Older
 * Copyright (c) 2022  The AdvantEDGE Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package vistrafficmgr
	"encoding/hex"
	"errors"
	"fmt"
	"net/url"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	mqtt "github.com/eclipse/paho.mqtt.golang"
)

type message_broker_mqtt struct {
	running    bool
	opts       *mqtt.ClientOptions
	client     mqtt.Client
	v2x_notify func(v2xMessage []byte, v2xType int32, msgProtocolVersion int32, stdOrganization string, longitude *float32, latitude *float32)
var _v2x_notify func(v2xMessage []byte, v2xType int32, msgProtocolVersion int32, stdOrganization string, longitude *float32, latitude *float32)

func onMessageReceived(client mqtt.Client, msg mqtt.Message) {
	go func() {
		log.Info("onMessageReceived: Received message: ", msg.Payload(), " on topic ", msg.Topic())
		if _v2x_notify != nil {
			if msg.Topic() == "3gpp/v2x/obu/vam" { // FIXME FSCOM Need to manage how to extract message type & message version
				_v2x_notify(msg.Payload(), 16, 2, "ETSI", nil, nil)
			}
		} else {
			log.Info("onMessageReceived: null pointer for the callbacl")
		}
	}()
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
	go func() {
		log.Info("mqtt.OnConnectHandler: Connected")
	}()
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
	log.Info("Connect lost:", err)
}

func (broker_mqtt *message_broker_mqtt) Init(tm *TrafficMgr) (err error) {
	log.Debug(">>> Init")
	broker_mqtt.running = false

	u, err := url.ParseRequestURI(tm.broker)
	if err != nil {
		log.Error(err.Error())
		return err
	}

	broker_mqtt.opts = mqtt.NewClientOptions()
	broker_mqtt.opts.SetDefaultPublishHandler(onMessageReceived)
	broker_mqtt.opts.SetClientID("AdvantEDGE.meep-vis-traffic-mgr")
	broker_mqtt.opts.OnConnect = connectHandler
	broker_mqtt.opts.OnConnectionLost = connectLostHandler
	broker_mqtt.opts.CleanSession = true
	broker_mqtt.opts.SetUsername("")
	broker_mqtt.opts.SetPassword("")
	log.Info("Init: Add brocker: ", fmt.Sprintf("tcp://%s:%s", u.Hostname(), u.Port()))
	broker_mqtt.opts.AddBroker(fmt.Sprintf("tcp://%s:%s", u.Hostname(), u.Port()))
	broker_mqtt.client = mqtt.NewClient(broker_mqtt.opts)

	log.Info("Init: Connect to MQTT server...")
	token := broker_mqtt.client.Connect()
	if token.Error() != nil {
		log.Error(token.Error())
		return token.Error()
	}
	token.Wait()

	// Subscribe
	log.Info("Init: Subscribe to: ", tm.topic+"/+")
	token = broker_mqtt.client.Subscribe(tm.topic+"/+", 0, nil) // qos:0
	if token.Error() != nil {
		log.Error(token.Error())
		return token.Error()
	}
	token.Wait()

	_v2x_notify = broker_mqtt.v2x_notify
	if _v2x_notify == nil {
		log.Error("Init: _v2x_notify is nil")
	}

	broker_mqtt.running = true
	log.Info("Init: Client is connected")
func (broker_mqtt *message_broker_mqtt) Run(tm *TrafficMgr) (err error) {
	log.Debug(">>> message_broker_mqtt: Run")
	// Sanity checks
	if !broker_mqtt.running {
		err := errors.New("MQTT not initialized or diconnected")
		log.Error(err.Error())
		return err
	}
func (broker_mqtt *message_broker_mqtt) Stop(tm *TrafficMgr) (err error) {
	log.Debug(">>> message_broker_mqtt: Stop")
	// Sanity checks
	if !broker_mqtt.running {
		err := errors.New("MQTT not initialized or diconnected")
		log.Error(err.Error())
		return err
	}

	token := broker_mqtt.client.Unsubscribe(tm.topic)
	if token.Error() != nil {
		log.Error(token.Error())
		// Continue
	}
	token.Wait()
	broker_mqtt.client.Disconnect(250)
	broker_mqtt.running = false
func (broker_mqtt *message_broker_mqtt) Send(tm *TrafficMgr, msgContent string, msgEncodeFormat string, stdOrganization string, msgType *int32) (err error) {
	log.Info("message_broker_mqtt: Send")

	// Sanity checks
	if !broker_mqtt.running {
		err := errors.New("MQTT not initialized or diconnected")
		log.Error(err.Error())
		return err
	}
	// Publish message
	if msgEncodeFormat == "hexadump" {
		content, err := hex.DecodeString(msgContent)
		if err != nil {
			log.Error(err.Error())
			return err
		}
		log.Info("message_broker_simu: Send: Publish content : ", content)
		log.Info("message_broker_simu: Send: msgEncodeFormat: ", msgEncodeFormat)
		log.Info("message_broker_simu: Send: stdOrganization: ", stdOrganization)
		token := broker_mqtt.client.Publish(tm.topic, 0, false, content)
		token.Wait()

		return nil
	}

	err = errors.New("MQTT encoding not supported, message is discarded")
	log.Error(err.Error())
	return err