Newer
Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package sssmgr
import (
"encoding/json"
"errors"
"fmt"
"reflect"
"strconv"
"sync"
log "github.com/InterDigitalInc/AdvantEDGE/go-packages/meep-logger"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
type SssMgrMqtt struct {
running bool
opts *mqtt.ClientOptions
client mqtt.Client
sent_topic string
onem2m_notify func(p_topic string, p_payload []byte)
}
var _onem2m_notify func(p_topic string, p_payload []byte)
var _sync_response *sync.WaitGroup = nil // Used to synchronize the response
var _responses map[uint16]mqtt.Message
func onMessageReceived(client mqtt.Client, msg mqtt.Message) {
go func() {
log.Info("onMessageReceived: Received message: ", msg.Payload(), " on topic ", msg.Topic())
// if _onem2m_notify != nil {
// _onem2m_notify(msg.Topic(), msg.Payload())
// } else {
// log.Info("onMessageReceived: null pointer for the callbacl")
// }
}()
}
func onMessageReceivedReq(client mqtt.Client, msg mqtt.Message) {
go func() {
log.Info("onMessageReceivedReq: Received message: ", string(msg.Payload()), " on topic ", msg.Topic())
if _onem2m_notify != nil {
_onem2m_notify(msg.Topic(), msg.Payload())
} else {
log.Info("onMessageReceivedReq: null pointer for the callbacl")
}
}()
}
func onMessageReceivedResp(client mqtt.Client, msg mqtt.Message) {
go func() {
log.Info("onMessageReceivedResp: Received message: ", string(msg.Payload()), " on topic ", msg.Topic())
defer _sync_response.Done()
_responses[msg.MessageID()] = msg
}()
}
var connectHandler mqtt.OnConnectHandler = func(client mqtt.Client) {
log.Info("mqtt.OnConnectHandler: Connected")
}
var connectLostHandler mqtt.ConnectionLostHandler = func(client mqtt.Client, err error) {
log.Info("Connect lost:", err)
}
func NewSssMgrMqtt() (broker_mqtt *SssMgrMqtt) {
log.Info(">>> NewSssMgrMqtt")
return new(SssMgrMqtt)
}
func (broker_mqtt *SssMgrMqtt) init(tm *SssMgr) (err error) {
log.Info(">>> init")
broker_mqtt.running = false
broker_mqtt.opts = mqtt.NewClientOptions()
broker_mqtt.opts.SetDefaultPublishHandler(onMessageReceived)
broker_mqtt.opts.SetClientID("AdvantEDGE.meep-vis-traffic-mgr")
broker_mqtt.opts.OnConnect = connectHandler
broker_mqtt.opts.OnConnectionLost = connectLostHandler
broker_mqtt.opts.CleanSession = true
broker_mqtt.opts.SetUsername("")
broker_mqtt.opts.SetPassword("")
log.Info("init: Add brocker: ", fmt.Sprintf("tcp://%s:%d", tm.host, tm.port))
broker_mqtt.opts.AddBroker(fmt.Sprintf("tcp://%s:%d", tm.host, tm.port))
broker_mqtt.client = mqtt.NewClient(broker_mqtt.opts)
log.Info("init: Connect to MQTT server...")
token := broker_mqtt.client.Connect()
if token.Error() != nil {
log.Error(token.Error())
return token.Error()
}
token.Wait()
// Subscribe
log.Info("init: Subscribe to: ", "oneM2M/req/+")
token = broker_mqtt.client.Subscribe("oneM2M/req/+", 0, onMessageReceivedReq) // qos:0
if token.Error() != nil {
log.Error(token.Error())
return token.Error()
}
// token.Wait()
log.Info("init: Subscribe to: ", "/oneM2M/resp/#")
token = broker_mqtt.client.Subscribe("/oneM2M/resp/#", 0, onMessageReceivedResp) // qos:0
if token.Error() != nil {
log.Error(token.Error())
return token.Error()
}
token.Wait()
_onem2m_notify = broker_mqtt.onem2m_notify
if _onem2m_notify == nil {
log.Error("init: _onem2m_notify is nil")
}
_sync_response = new(sync.WaitGroup) // Used to synchronize the response
_responses = make(map[uint16]mqtt.Message, 0)
broker_mqtt.running = true
log.Info("init: Client is connected")
return nil
}
func (broker_mqtt *SssMgrMqtt) uninit() (err error) {
log.Info(">>> uninit")
if !broker_mqtt.running {
err := errors.New("MQTT not initialized or diconnected")
log.Error(err.Error())
return err
}
token := broker_mqtt.client.Unsubscribe("/oneM2M")
if token.Error() != nil {
log.Error(token.Error())
// Continue
}
token.Wait()
broker_mqtt.client.Disconnect(250)
broker_mqtt.running = false
_sync_response = nil
_responses = nil
return nil
}
func (broker_mqtt *SssMgrMqtt) send(p_ctx SssMgrBindingProtocolContext) (err error, resp interface{}) {
log.Info(">>> send")
// Sanity checks
if !broker_mqtt.running {
err := errors.New("MQTT not initialized or diconnected")
log.Error(err.Error())
return err, nil
}
// Prepare the topic
broker_mqtt.sent_topic = "/oneM2M/req/" + "C" + p_ctx.from + "/" + p_ctx.name + "/json" // FIXME FSCOM Use parameter for type
// Complete payload dictionary
var body = map[string]interface{}{}
body["op"] = p_ctx.op
if err != nil {
log.Error(err.Error())
return err, nil
}
if p_ctx.ty != -1 {
body["ty"] = p_ctx.ty
}
body["fr"] = "C" + p_ctx.from
body["to"] = p_ctx.to
body["rqi"] = p_ctx.rqi
body["rvi"] = p_ctx.rvi[0]
if p_ctx.body != nil {
body["pc"] = p_ctx.body
}
if p_ctx.queries != nil && len(p_ctx.queries) != 0 {
d := make(map[string]int, 0)
for k, v := range p_ctx.queries {
if k == "ty" { // mosquitto_pub -d -q 0 -h 172.29.10.56 -p 1883 -t "/oneM2M/req/CAdmin/laboai-acme-ic-cse/json" -m "{\"fr\":\"CAdmin\",\"op\":2,\"rqi\":\"432bb877-7dc5-4e4d-b424-9c0d50604596\",\"rvi\":\"4\",\"to\":\"laboai-cse-in/YannouDomainAutomation/YannouGardenZone0\",\"ty\":3,\"fc\":{\"fu\":2,\"fo\":1}}"
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
if err != nil {
log.Error(err.Error())
return err, nil
}
continue
}
d[k], err = strconv.Atoi(v)
if err != nil {
log.Error(err.Error())
return err, nil
}
} // End of 'for' statement
d["fo"] = 1
body["fc"] = d
}
log.Debug("send: body=", body)
content, err := json.Marshal(body)
if err != nil {
log.Error("send: ", err.Error())
return err, nil
}
log.Debug("send: content: ", string(content))
log.Debug("send: topic: ", broker_mqtt.sent_topic)
token := broker_mqtt.client.Publish(broker_mqtt.sent_topic, 0, false, string(content))
token.Wait()
// Syncronization with the response
log.Debug("send: Start waiting for the response")
_sync_response.Add(1)
_sync_response.Wait()
if val, ok := _responses[token.(*mqtt.PublishToken).MessageID()]; ok {
delete(_responses, token.(*mqtt.PublishToken).MessageID())
log.Debug("send: Get the response: ", string(val.Payload()))
var d map[string]interface{}
err = json.Unmarshal(val.Payload(), &d)
if err != nil {
log.Error("send: ", err.Error())
return err, nil
}
if r, ok := d["pc"]; ok {
log.Debug("send: r: ", r)
log.Debug("send: TypeOf(r): ", reflect.TypeOf(r))
// var b []byte
// b, err = json.Marshal(r)
// if err != nil {
// log.Error("send: ", err.Error())
// return err, nil
// }
// log.Info("send: b: ", b)
// log.Info("send: TypeOf(b): ", reflect.TypeOf(b))
// err = json.Unmarshal(b, &resp)
// if err != nil {
// log.Error("send: ", err.Error())
// return err, nil
// }
return nil, r
}
return err, nil
} else {
log.Info("send: No response for messageID: ", token.(*mqtt.PublishToken).MessageID())
}
return nil, nil
}
func (broker_mqtt *SssMgrMqtt) notify(p_resp string) (err error) {
log.Info(">>> notify")
return nil
}