Newer
Older
* Copyright (c) 2022 The AdvantEDGE Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
"encoding/hex"
"errors"
"fmt"
"net/url"
log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
type message_broker_mqtt struct {
running bool
opts *mqtt.ClientOptions
client mqtt.Client
// var messagePubHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
Yann Garcia
committed
// log.Info("Received message: ", msg.Payload(), "on topic ", msg.Topic())
// fmt.Println("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
// }
func onMessageReceived(client mqtt.Client, msg mqtt.Message) {
go func() {
Yann Garcia
committed
log.Info("Received message: ", msg.Payload(), "on topic ", msg.Topic())
fmt.Println("Received message: %s from topic: %s\n", msg.Payload(), msg.Topic())
}()
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
go func() {
log.Info("Connected")
fmt.Println("====> Connected")
}()
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
log.Info("Connect lost:", err)
}
func (broker_mqtt *message_broker_mqtt) Init(tm *TrafficMgr) (err error) {
log.Debug(">>> message_broker_mqtt: Init")
u, err := url.ParseRequestURI(tm.broker)
if err != nil {
log.Error(err.Error())
return err
}
broker_mqtt.opts = mqtt.NewClientOptions()
broker_mqtt.opts.SetDefaultPublishHandler(onMessageReceived /*messagePubHandler*/)
//broker_mqtt.opts.SetClientID("AdvantEDGE.meep-vis-traffic-mgr")
broker_mqtt.opts.OnConnect = connectHandler
broker_mqtt.opts.OnConnectionLost = connectLostHandler
//broker_mqtt.opts.SetUsername("emqx")
//broker_mqtt.opts.SetPassword("public")
log.Info("Add brocker: ", fmt.Sprintf("tcp://%s:%s", u.Hostname(), u.Port()))
broker_mqtt.opts.AddBroker(fmt.Sprintf("tcp://%s:%s", u.Hostname(), u.Port()))
broker_mqtt.client = mqtt.NewClient(broker_mqtt.opts)
log.Info("Connect to MQTT server...")
token := broker_mqtt.client.Connect()
if token.Error() != nil {
log.Error(token.Error())
return token.Error()
}
token.Wait()
// Subscribe
log.Info("Subscribe to: ", tm.topic)
token = broker_mqtt.client.Subscribe(tm.topic, 0, nil) // qos:0
if token.Error() != nil {
log.Error(token.Error())
return token.Error()
}
token.Wait()
broker_mqtt.running = true
log.Info("mqtt.Init: Client is connected")
func (broker_mqtt *message_broker_mqtt) Run(tm *TrafficMgr) (err error) {
log.Debug(">>> message_broker_mqtt: Run")
// Sanity checks
if !broker_mqtt.running {
err := errors.New("MQTT not initialized or diconnected")
log.Error(err.Error())
return err
}
func (broker_mqtt *message_broker_mqtt) Stop(tm *TrafficMgr) (err error) {
log.Debug(">>> message_broker_mqtt: Stop")
// Sanity checks
if !broker_mqtt.running {
err := errors.New("MQTT not initialized or diconnected")
log.Error(err.Error())
return err
}
token := broker_mqtt.client.Unsubscribe(tm.topic)
if token.Error() != nil {
log.Error(token.Error())
// Continue
}
token.Wait()
broker_mqtt.client.Disconnect(250)
broker_mqtt.running = false
func (broker_mqtt *message_broker_mqtt) Send(tm *TrafficMgr, msgContent string, msgEncodeFormat string, stdOrganization string, msgType *int32) (err error) {
log.Info("message_broker_mqtt: Send")
// Sanity checks
if !broker_mqtt.running {
err := errors.New("MQTT not initialized or diconnected")
log.Error(err.Error())
return err
}
// Publish message
if msgEncodeFormat == "hexadump" {
content, err := hex.DecodeString(msgContent)
if err != nil {
log.Error(err.Error())
return err
}
log.Info("message_broker_simu: Send: Publish content : ", content)
log.Info("message_broker_simu: Send: msgEncodeFormat: ", msgEncodeFormat)
log.Info("message_broker_simu: Send: stdOrganization: ", stdOrganization)
token := broker_mqtt.client.Publish(tm.topic, 0, false, content)
token.Wait()
return nil
}
err = errors.New("MQTT encoding not supported, message is discarded")
log.Error(err.Error())
return err