/* * Copyright (c) 2022 The AdvantEDGE Authors * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ package vistrafficmgr import ( "encoding/hex" "errors" "fmt" "net/url" log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger" mqtt "github.com/eclipse/paho.mqtt.golang" ) type message_broker_mqtt struct { running bool opts *mqtt.ClientOptions client mqtt.Client } // var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) { // log.Info("Received message: from topic: ", msg.Payload(), "on topic ", msg.Topic()) // fmt.Println("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic()) // } func onMessageReceived(client mqtt.Client, msg mqtt.Message) { go func() { log.Info("Received message: from topic: ", msg.Payload(), "on topic ", msg.Topic()) fmt.Println("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic()) }() } var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) { go func() { log.Info("Connected") fmt.Println("====> Connected") }() } var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) { log.Info("Connect lost:", err) } func (broker_mqtt *message_broker_mqtt) Init(tm *TrafficMgr) (err error) { log.Debug(">>> message_broker_mqtt: Init") broker_mqtt.running = false u, err := url.ParseRequestURI(tm.broker) if err != nil { log.Error(err.Error()) return err } broker_mqtt.opts = mqtt.NewClientOptions() broker_mqtt.opts.SetDefaultPublishHandler(onMessageReceived /*messagePubHandler*/) //broker_mqtt.opts.SetClientID("AdvantEDGE.meep-vis-traffic-mgr") broker_mqtt.opts.OnConnect = connectHandler broker_mqtt.opts.OnConnectionLost = connectLostHandler //broker_mqtt.opts.SetUsername("emqx") //broker_mqtt.opts.SetPassword("public") log.Info("Add brocker: ", fmt.Sprintf("tcp://%s:%s", u.Hostname(), u.Port())) broker_mqtt.opts.AddBroker(fmt.Sprintf("tcp://%s:%s", u.Hostname(), u.Port())) broker_mqtt.client = mqtt.NewClient(broker_mqtt.opts) log.Info("Connect to MQTT server...") token := broker_mqtt.client.Connect() if token.Error() != nil { log.Error(token.Error()) return token.Error() } token.Wait() // Subscribe log.Info("Subscribe to: ", tm.topic) token = broker_mqtt.client.Subscribe(tm.topic, 0, nil) // qos:0 if token.Error() != nil { log.Error(token.Error()) return token.Error() } token.Wait() broker_mqtt.running = true log.Info("mqtt.Init: Client is connected") return nil } func (broker_mqtt *message_broker_mqtt) Run(tm *TrafficMgr) (err error) { log.Debug(">>> message_broker_mqtt: Run") // Sanity checks if !broker_mqtt.running { err := errors.New("MQTT not initialized or diconnected") log.Error(err.Error()) return err } return nil } func (broker_mqtt *message_broker_mqtt) Stop(tm *TrafficMgr) (err error) { log.Debug(">>> message_broker_mqtt: Stop") // Sanity checks if !broker_mqtt.running { err := errors.New("MQTT not initialized or diconnected") log.Error(err.Error()) return err } token := broker_mqtt.client.Unsubscribe(tm.topic) if token.Error() != nil { log.Error(token.Error()) // Continue } token.Wait() broker_mqtt.client.Disconnect(250) broker_mqtt.running = false return nil } func (broker_mqtt *message_broker_mqtt) Send(tm *TrafficMgr, msgContent string, msgEncodeFormat string, stdOrganization string, msgType *int32) (err error) { log.Info("message_broker_mqtt: Send") // Sanity checks if !broker_mqtt.running { err := errors.New("MQTT not initialized or diconnected") log.Error(err.Error()) return err } // Publish message if msgEncodeFormat == "hexadump" { content, err := hex.DecodeString(msgContent) if err != nil { log.Error(err.Error()) return err } log.Info("message_broker_simu: Send: Publish content : ", content) log.Info("message_broker_simu: Send: msgEncodeFormat: ", msgEncodeFormat) log.Info("message_broker_simu: Send: stdOrganization: ", stdOrganization) token := broker_mqtt.client.Publish(tm.topic, 0, false, content) token.Wait() return nil } err = errors.New("MQTT encoding not supported, message is discarded") log.Error(err.Error()) return err }