Newer
Older
* Copyright (c) 2022 The AdvantEDGE Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
"encoding/hex"
"errors"
"fmt"
"net/url"
log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
type message_broker_mqtt struct {
running bool
opts *mqtt.ClientOptions
client mqtt.Client
v2x_notify func(v2xMessage []byte, v2xType int32, msgProtocolVersion int32, stdOrganization string, longitude *float32, latitude *float32)
var _v2x_notify func(v2xMessage []byte, v2xType int32, msgProtocolVersion int32, stdOrganization string, longitude *float32, latitude *float32)
func onMessageReceived(client mqtt.Client, msg mqtt.Message) {
go func() {
log.Info("onMessageReceived: Received message: ", msg.Payload(), " on topic ", msg.Topic())
if _v2x_notify != nil {
if msg.Topic() == "3gpp/v2x/obu/vam" { // FIXME FSCOM Need to manage how to extract message type & message version
_v2x_notify(msg.Payload(), 16, 2, "ETSI", nil, nil)
}
} else {
log.Info("onMessageReceived: null pointer for the callbacl")
}
}()
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
go func() {
log.Info("mqtt.OnConnectHandler: Connected")
}()
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
log.Info("Connect lost:", err)
}
func (broker_mqtt *message_broker_mqtt) Init(tm *TrafficMgr) (err error) {
u, err := url.ParseRequestURI(tm.broker)
if err != nil {
log.Error(err.Error())
return err
}
broker_mqtt.opts = mqtt.NewClientOptions()
broker_mqtt.opts.SetDefaultPublishHandler(onMessageReceived)
broker_mqtt.opts.SetClientID("AdvantEDGE.meep-vis-traffic-mgr")
broker_mqtt.opts.OnConnect = connectHandler
broker_mqtt.opts.OnConnectionLost = connectLostHandler
broker_mqtt.opts.CleanSession = true
broker_mqtt.opts.SetUsername("")
broker_mqtt.opts.SetPassword("")
log.Info("Init: Add brocker: ", fmt.Sprintf("tcp://%s:%s", u.Hostname(), u.Port()))
broker_mqtt.opts.AddBroker(fmt.Sprintf("tcp://%s:%s", u.Hostname(), u.Port()))
broker_mqtt.client = mqtt.NewClient(broker_mqtt.opts)
log.Info("Init: Connect to MQTT server...")
token := broker_mqtt.client.Connect()
if token.Error() != nil {
log.Error(token.Error())
return token.Error()
}
token.Wait()
// Subscribe
log.Info("Init: Subscribe to: ", tm.topic+"/+")
token = broker_mqtt.client.Subscribe(tm.topic+"/+", 0, nil) // qos:0
if token.Error() != nil {
log.Error(token.Error())
return token.Error()
}
token.Wait()
_v2x_notify = broker_mqtt.v2x_notify
if _v2x_notify == nil {
log.Error("Init: _v2x_notify is nil")
}
log.Info("Init: Client is connected")
func (broker_mqtt *message_broker_mqtt) Run(tm *TrafficMgr) (err error) {
log.Debug(">>> message_broker_mqtt: Run")
// Sanity checks
if !broker_mqtt.running {
err := errors.New("MQTT not initialized or diconnected")
log.Error(err.Error())
return err
}
func (broker_mqtt *message_broker_mqtt) Stop(tm *TrafficMgr) (err error) {
log.Debug(">>> message_broker_mqtt: Stop")
// Sanity checks
if !broker_mqtt.running {
err := errors.New("MQTT not initialized or diconnected")
log.Error(err.Error())
return err
}
token := broker_mqtt.client.Unsubscribe(tm.topic)
if token.Error() != nil {
log.Error(token.Error())
// Continue
}
token.Wait()
broker_mqtt.client.Disconnect(250)
broker_mqtt.running = false
func (broker_mqtt *message_broker_mqtt) Send(tm *TrafficMgr, msgContent string, msgEncodeFormat string, stdOrganization string, msgType *int32) (err error) {
log.Info("message_broker_mqtt: Send")
// Sanity checks
if !broker_mqtt.running {
err := errors.New("MQTT not initialized or diconnected")
log.Error(err.Error())
return err
}
// Publish message
if msgEncodeFormat == "hexadump" {
content, err := hex.DecodeString(msgContent)
if err != nil {
log.Error(err.Error())
return err
}
log.Info("message_broker_mqtt: Send: Publish content : ", content)
log.Info("message_broker_mqtt: Send: msgEncodeFormat: ", msgEncodeFormat)
log.Info("message_broker_mqtt: Send: stdOrganization: ", stdOrganization)
token := broker_mqtt.client.Publish(tm.topic, 0, false, content)
token.Wait()
return nil
}
err = errors.New("MQTT encoding not supported, message is discarded")
log.Error(err.Error())
return err