Skip to content
mqtt.go 4.61 KiB
Newer Older
 * Copyright (c) 2022  The AdvantEDGE Authors
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package vistrafficmgr
	"encoding/hex"
	"errors"
	"fmt"
	"net/url"

	log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
	mqtt "github.com/eclipse/paho.mqtt.golang"
)

type message_broker_mqtt struct {
	running bool
	opts    *mqtt.ClientOptions
	client  mqtt.Client
// var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
// 	log.Info("Received message: from topic: ", msg.Payload(), "on topic ", msg.Topic())
// 	fmt.Println("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
// }
func onMessageReceived(client mqtt.Client, msg mqtt.Message) {
	go func() {
		log.Info("Received message: from topic: ", msg.Payload(), "on topic ", msg.Topic())
		fmt.Println("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
	}()
}

var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
	go func() {
		log.Info("Connected")
		fmt.Println("====> Connected")
	}()
}

var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
	log.Info("Connect lost:", err)
}

func (broker_mqtt *message_broker_mqtt) Init(tm *TrafficMgr) (err error) {
	log.Debug(">>> message_broker_mqtt: Init")
	broker_mqtt.running = false

	u, err := url.ParseRequestURI(tm.broker)
	if err != nil {
		log.Error(err.Error())
		return err
	}

	broker_mqtt.opts = mqtt.NewClientOptions()
	broker_mqtt.opts.SetDefaultPublishHandler(onMessageReceived /*messagePubHandler*/)
	//broker_mqtt.opts.SetClientID("AdvantEDGE.meep-vis-traffic-mgr")
	broker_mqtt.opts.OnConnect = connectHandler
	broker_mqtt.opts.OnConnectionLost = connectLostHandler
	//broker_mqtt.opts.SetUsername("emqx")
	//broker_mqtt.opts.SetPassword("public")
	log.Info("Add brocker: ", fmt.Sprintf("tcp://%s:%s", u.Hostname(), u.Port()))
	broker_mqtt.opts.AddBroker(fmt.Sprintf("tcp://%s:%s", u.Hostname(), u.Port()))
	broker_mqtt.client = mqtt.NewClient(broker_mqtt.opts)

	log.Info("Connect to MQTT server...")
	token := broker_mqtt.client.Connect()
	if token.Error() != nil {
		log.Error(token.Error())
		return token.Error()
	}
	token.Wait()

	// Subscribe
	log.Info("Subscribe to: ", tm.topic)
	token = broker_mqtt.client.Subscribe(tm.topic, 0, nil) // qos:0
	if token.Error() != nil {
		log.Error(token.Error())
		return token.Error()
	}
	token.Wait()

	broker_mqtt.running = true
	log.Info("mqtt.Init: Client is connected")
func (broker_mqtt *message_broker_mqtt) Run(tm *TrafficMgr) (err error) {
	log.Debug(">>> message_broker_mqtt: Run")
	// Sanity checks
	if !broker_mqtt.running {
		err := errors.New("MQTT not initialized or diconnected")
		log.Error(err.Error())
		return err
	}
func (broker_mqtt *message_broker_mqtt) Stop(tm *TrafficMgr) (err error) {
	log.Debug(">>> message_broker_mqtt: Stop")
	// Sanity checks
	if !broker_mqtt.running {
		err := errors.New("MQTT not initialized or diconnected")
		log.Error(err.Error())
		return err
	}

	token := broker_mqtt.client.Unsubscribe(tm.topic)
	if token.Error() != nil {
		log.Error(token.Error())
		// Continue
	}
	token.Wait()
	broker_mqtt.client.Disconnect(250)
	broker_mqtt.running = false
func (broker_mqtt *message_broker_mqtt) Send(tm *TrafficMgr, msgContent string, msgEncodeFormat string, stdOrganization string, msgType *int32) (err error) {
	log.Info("message_broker_mqtt: Send")

	// Sanity checks
	if !broker_mqtt.running {
		err := errors.New("MQTT not initialized or diconnected")
		log.Error(err.Error())
		return err
	}
	// Publish message
	if msgEncodeFormat == "hexadump" {
		content, err := hex.DecodeString(msgContent)
		if err != nil {
			log.Error(err.Error())
			return err
		}
		log.Info("message_broker_simu: Send: Publish content : ", content)
		log.Info("message_broker_simu: Send: msgEncodeFormat: ", msgEncodeFormat)
		log.Info("message_broker_simu: Send: stdOrganization: ", stdOrganization)
		token := broker_mqtt.client.Publish(tm.topic, 0, false, content)
		token.Wait()

		return nil
	}

	err = errors.New("MQTT encoding not supported, message is discarded")
	log.Error(err.Error())
	return err